当前位置: 首页 > news >正文

WebApplicationType.REACTIVE 的webSocket 多实例问题处理

  1. 配置类
@Configuration
public class WebFluxWebSocketConfig {/** 让 Spring 注入已经带依赖的 Handler */@Beanpublic HandlerMapping webSocketMapping(WebSocketReceivedHandler handler) {return new SimpleUrlHandlerMapping(Map.of("/api/xxx/ws", handler),   // 用注入的 handler-1);}@Beanpublic WebSocketHandlerAdapter handlerAdapter() {return new WebSocketHandlerAdapter();}
}
@Configuration
public class RedisPubSubConfig {@Beanpublic RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory,RedisBroadcastListener listener) {RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.addMessageListener(listener, new ChannelTopic("ws-broadcast"));return container;}
}
  1. handler
@Component
@RequiredArgsConstructor
@Slf4j
public class WebSocketReceivedHandler implements WebSocketHandler {@Autowiredprivate AiBroadcastEventHandlerDispatcher<?, ?> dispatcher;@Autowiredprivate WsSessionPool wsSessionPool;@Overridepublic @NotNull Mono<Void> handle(@NotNull WebSocketSession session) {log.info("websocket 连接成功,sessionId:{}", session.getId());wsSessionPool.add(session);String sid = session.getId();// 处理客户端请求消息,生成响应消息流Flux<WebSocketMessage> inputFlux = session.receive().map(WebSocketMessage::getPayloadAsText).flatMap(payload -> dispatcher.doDispatch(session, payload).map(session::textMessage));// 服务端广播消息流Flux<WebSocketMessage> broadcastFlux = wsSessionPool.getPersonalFlux(sid).map(session::textMessage);// 合并两个流,确保 session.send 只调用一次Flux<WebSocketMessage> mergedFlux = Flux.merge(inputFlux, broadcastFlux);log.info("websocket 开始发送消息,sessionId:{}", session.getId());return session.send(mergedFlux).doFinally(sig -> {wsSessionPool.remove(session);log.info("websocket 关闭,sessionId:{},signal:{}", session.getId(), sig);});}
}

3.连接池

@Component
@Slf4j
public class WsSessionPool {/** session 实体 */private final Map<String, WebSocketSession> SESSIONS = new ConcurrentHashMap<>();/** 消息推送通道:Replay 可以避免未订阅者时失败,limit(1) 限制内存 */private final Map<String, Sinks.Many<String>> SINKS = new ConcurrentHashMap<>();/** 添加新的连接 */public void add(WebSocketSession session) {String sid = session.getId();SESSIONS.put(sid, session);// 推荐使用 replay 降低 emit 失败风险Sinks.Many<String> sink = Sinks.many().replay().limit(1);SINKS.put(sid, sink);log.info("WS 连接上线: {}, 当前连接数={}", sid, SESSIONS.size());}/** 移除连接(主动或异常关闭) */public void remove(WebSocketSession session) {removeById(session.getId());}public void removeById(String sessionId) {SESSIONS.remove(sessionId);Sinks.Many<String> sink = SINKS.remove(sessionId);if (sink != null) {sink.tryEmitComplete(); // 通知关闭}log.info("WS 连接下线: {}, 当前连接数={}", sessionId, SESSIONS.size());}/** 获取指定 session 的推送 Flux */public Flux<String> getPersonalFlux(String sessionId) {Sinks.Many<String> sink = SINKS.get(sessionId);if (sink == null) {log.warn("sessionId={} 不存在,返回空流", sessionId);return Flux.empty();}return sink.asFlux().doOnCancel(() -> {log.info("sessionId={} Flux 被取消订阅", sessionId);removeById(sessionId);}).doOnTerminate(() -> {log.info("sessionId={} Flux 被终止", sessionId);removeById(sessionId);});}/** 广播推送消息到所有连接 */public void broadcast(String json) {for (Map.Entry<String, Sinks.Many<String>> entry : SINKS.entrySet()) {String sid = entry.getKey();Sinks.Many<String> sink = entry.getValue();Sinks.EmitResult result = sink.tryEmitNext(json);if (result.isFailure()) {log.warn("广播失败 sid={}, result={}, 自动移除连接", sid, result);removeById(sid);}}log.info("广播成功,消息: {}, 当前在线连接: {}", json, SINKS.size());}
}
  1. 心跳机制
@Component
@Log4j2
public class WsHeartbeatTask {private final WsSessionPool wsSessionPool;public WsHeartbeatTask(WsSessionPool wsSessionPool) {this.wsSessionPool = wsSessionPool;}@PostConstructpublic void init() {log.info("WebSocket心跳任务已启动");}// 每30秒广播一个心跳消息@Scheduled(fixedRate = 30_000)public void sendHeartbeat() {String timeStr = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));String json = String.format("{\"type\":\"ping\",\"timestamp\":\"%s\"}", timeStr);wsSessionPool.broadcast(json);}
}
  1. listener
@Component
@Log4j2
public class RedisBroadcastListener implements MessageListener {private final WsSessionPool wsSessionPool;public RedisBroadcastListener(WsSessionPool wsSessionPool) {this.wsSessionPool = wsSessionPool;}@Overridepublic void onMessage(Message message, byte[] pattern) {String body = new String(message.getBody(), StandardCharsets.UTF_8);log.info("Redis广播监听器收到消息:{}", body);wsSessionPool.broadcast(body);}
}
  1. 监听kafka 转成redis发送websocket消息
@Component
@Log4j2
public class MapAlarmMsgHandler implements AiDetectionScreenMessageHandler<List<FaultAlarmVO>> {// 用于发布消息@Autowiredprivate StringRedisTemplate redisTemplate;@Overridepublic Integer messageType() {return AiBroadcastEventEnum.MAP_ALARM_CHARGING.getCode();}@Overridepublic void handler(AiDetectionMessageRedisEvent event) {log.info("FaultAlarmMsgHandler:{}", event);try {List<FaultAlarmVO> vo = getSource(event);Mono<WebSocketResponse<List<FaultAlarmVO>>> response = WebSocketResponse.ok(AiBroadcastEventEnum.MAP_ALARM_CHARGING.getCode(), vo);WebSocketResponse<List<FaultAlarmVO>> block = response.block();String json = JSONUtil.toJsonStr(block);redisTemplate.convertAndSend("ws-broadcast", json);log.info("FaultAlarmMsgHandler-广播消息转发到 Redis:{}", json);} catch (Exception e) {log.error("消息处理error:{}", event, e);}}@Overridepublic List<FaultAlarmVO> getSource(AiDetectionMessageRedisEvent event) {return (List<FaultAlarmVO>) event.getContent();}
}
http://www.lryc.cn/news/589132.html

相关文章:

  • 网络模型
  • TCP协议可靠性设计的核心机制与底层逻辑
  • 计算机系统方向可发会议/期刊参考时间
  • PostgreSQL 超详细安装与使用教程:从入门到实战
  • 【实时Linux实战系列】实时数据流的网络传输
  • Flutter Socket 连接方案分析与适用场景
  • 国产化Excel处理组件Spire.XLS教程:在 C# 中生成 Excel文件
  • excel 通过openpyxl表格下载和插入图片
  • 给 Excel 整列空格文字内容加上前缀:像给文字穿衣服一样简单!
  • Vue获取上传Excel文件内容并展示在表格中
  • 【YOLOv11-目标检测】06-模型部署(C++)
  • Sentinel热点参数限流完整示例实现
  • 【PCIe 总线及设备入门学习专栏 5.1.1 -- PCIe PERST# 信号的作用】
  • 【PCIe 总线及设备入门学习专栏 5.1.2 -- PCIe EP core_rst_n 与 app_rst_n】
  • Excel制作玫瑰图
  • 大语言模型:高考志愿填报的“新纪元智能参谋”
  • 20250715问答课题-基于BERT与混合检索问答系统
  • 论文阅读:arxiv 2025 A Survey on Data Contamination for Large Language Models
  • 第八章,应用题
  • OpenCV 对数变换函数logTransform()
  • 【机器学习】第一章 概述
  • 【机器学习】第二章 Python入门
  • 【安卓笔记】RxJava之flatMap的使用
  • PyTorch笔记6----------神经网络案例
  • 【人工智能99问】神经网络的工作原理是什么?(4/99)
  • Android中Launcher简介
  • MySQL索引与事务详解:用大白话讲透核心概念
  • compose、 pipe 组合函数实现
  • 从底层技术到产业落地:优秘企业智脑的 AI 革命路径解析
  • Basilisk库教程(二)