当前位置: 首页 > news >正文

VUE+Spring Flux实现SSE长连接

VUE代码

        // 初始化EventSourceinitEventSource(url) {const token = getAccessToken();const eventSource = new EventSourcePolyfill(url, {headers: {'Authorization': `Bearer ${token}`,'tenant-id': getTenantId(),}});eventSource.onerror = (e) => {console.log("SSE连接错误", e);if (e.readyState === EventSource.CLOSED || eventSource.readyState === EventSource.CONNECTING) {console.log("SSE连接已关闭或正在重连");} else {// 当发生错误时,尝试重新连接if (reconnectAttempts < maxReconnectAttempts) {console.log(`尝试第${reconnectAttempts + 1}次重连`);reconnectAttempts++;setTimeout(() => {eventSource.close(); // 关闭当前连接this.initEventSource(url); // 重新初始化EventSource}, reconnectDelay * reconnectAttempts);} else {console.error("达到最大重连次数,不再尝试重连");}}};eventSource.addEventListener("message", res => {const data = JSON.parse(res.data)if (data.type == 2) {this.unreadCount = data.content;}if (data.type == 1) {this.createNotify(data)}})},

后端采用redis做管道,能够兼容分布式服务

JAVA 监听接口

 @GetMapping(value = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public Flux<String> streamEvents() {Long loginUserId = SecurityFrameworkUtils.getLoginUserId();USER_IDS.add(loginUserId);SseMessageVO heartbeat = new SseMessageVO().setType(SseNotifyTypeEnum.HEARTBEAT.getType()).setUserId(loginUserId).setContent("Heartbeat");return  reactiveRedisOperations.listenToChannel(SseService.getDestination(loginUserId)).map(data -> sendMsg(loginUserId,data.getMessage(),heartbeat)).publishOn(Schedulers.boundedElastic()).doOnSubscribe(subscription -> {// 订阅时发送一次心跳,确认连接heartbeat(heartbeat);});}private String sendMsg(Long loginUserId,String sseMessage,SseMessageVO heartbeat){SseMessageVO sseMessageVO = JSONUtil.toBean(sseMessage, SseMessageVO.class);if (null != sseMessageVO && Objects.equals(sseMessageVO.getUserId(), loginUserId)){return JSONUtil.toJsonStr(sseMessageVO);}return JSONUtil.toJsonStr(heartbeat);}/*** 登录时心跳*/private void heartbeat(SseMessageVO heartbeat) {ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();executorService.schedule(() -> {sseService.publishEventToChannel(heartbeat).subscribe();}, 1, TimeUnit.SECONDS);executorService.shutdown();}/*** 保活*/@PostConstructpublic void heartbeatTimer() {ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();executor.scheduleAtFixedRate(() -> {if (CollectionUtil.isNotEmpty(USER_IDS)){for (Long userId : USER_IDS) {String message = "Heartbeat at " + LocalDateTime.now();SseMessageVO heartbeat = new SseMessageVO().setType(SseNotifyTypeEnum.HEARTBEAT.getType()).setUserId(userId).setContent(message);sseService.publishEventToChannel(heartbeat).subscribe();}}}, 0, 10, TimeUnit.SECONDS);}

JAVA 提交数据服务

@Component
public class SseService {@Resourceprivate ReactiveRedisOperations<String, String> reactiveRedisOperations;private static final String DESTINATION = "event-channel-user:";/*** 获取指定通道* @param userId* @return*/public static String getDestination(Long userId){return DESTINATION+userId;}/*** 推送事件到通道* @param sseMessageVO* @return*/public Mono<Long> publishEventToChannel(SseMessageVO sseMessageVO) {return reactiveRedisOperations.convertAndSend(getDestination(sseMessageVO.getUserId()), JSONUtil.toJsonStr(sseMessageVO));}}

http://www.lryc.cn/news/399302.html

相关文章:

  • C#实现Winform程序右下角弹窗消息提示
  • Java三剑客:封装、继承、多态的魔法世界
  • 0145__Linux的capability
  • # Redis 入门到精通(一)数据类型(4)
  • 西邮计科嵌入式复习
  • Java如何使用 HttpClientUtils 发起 HTTP 请求
  • 无人机的工作原理
  • 敏捷开发笔记(第10章节)--Liskov原则(LSP)
  • 基于SSM的校园一卡通管理系统的设计与实现
  • 新版Android Studio中设置gradle的JDK版本
  • 打造你的智能家居指挥中心:基于STM32的多协议(zigbee、http)网关(附代码示例)
  • 【基于R语言群体遗传学】-16-中性检验Tajima‘s D及连锁不平衡 linkage disequilibrium (LD)
  • 防火墙组网与安全策略实验
  • xmind梳理测试点,根据这些测试点去写测试用例
  • MICCAI 2024 每日一篇论文 纯纯直读 CUTS:用于多粒度无监督医学图像分割的深度学习和拓扑框架
  • 实验9 存储过程与函数的创建管理实验
  • 计算机网络--tcpdump和iptable设置、内核参数优化策略
  • Vue3框架搭建2:axios+typescript封装
  • 【机器学习】使用决策树分类器预测汽车安全性的研究与分析
  • 【香橙派 Orange pi AIpro】| 开发板深入使用体验
  • 初识Laravel(Laravel的项目搭建)
  • RequestContextHolder多线程获取不到request对象
  • 打造高效工作与生活质量的完美平衡
  • 【零基础】学JS之APIS第四天
  • 走进linux
  • 智能家居开发新进展:乐鑫 ESP-ZeroCode 与亚马逊 ACK for Matter 实现集成
  • 本地事务和分布式事务
  • 昇思25天学习打卡营第14天|基于MindNLP的文本解码原理
  • Base64文件流查看下载PDF方法-CSDN
  • 基于TCP的在线词典系统(分阶段实现)(阻塞io和多路io复用(select)实现)