当前位置: 首页 > article >正文

SpringBoot+vue+SSE+Nginx实现消息实时推送

一、背景

        项目中消息推送,简单的有短轮询、长轮询,还有SSE(Server-Sent Events)、以及最强大复杂的WebSocket。

        至于技术选型,SSE和WebSocket区别,网上有很多,我也不整理了,大佬的链接

《网页端IM通信技术快速入门:短轮询、长轮询、SSE、WebSocket》。

        其实实现很简单,写这篇文章的目的,主要是将处理过程中的一些问题,记录解决方案。

二、后端实现

        其实这里网上也是很多demo,我简写一点demo

1、引入依赖

这里需要用的下面依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId>
</dependency>

2、sse工具类

package com.asiainfo.common.utils.sse;import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;/*** Des: sse工具类* Author: SiQiangMing 2025/5/28 15:50*/
@Slf4j
public class SseEmitterUtil {/*** 使用map对象,便于根据userId来获取对应的SseEmitter,或者放redis里面*/private final static Map<Long, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();/*** 用户创建sse链接* Author: SiQiangMing 2025/5/28 17:21* @param userId: 用户id* @return org.springframework.web.servlet.mvc.method.annotation.SseEmitter*/public static SseEmitter connect(Long userId) {// 设置超时时间,0表示不过期。默认30S,超时时间未完成会抛出异常:AsyncRequestTimeoutExceptionSseEmitter sseEmitter = new SseEmitter(0L);// 注册回调sseEmitter.onCompletion(completionCallBack(userId));sseEmitter.onError(errorCallBack(userId));sseEmitter.onTimeout(timeoutCallBack(userId));sseEmitterMap.put(userId, sseEmitter);log.info("----------------------------创建新的 SSE 连接,当前用户 {}, 连接总数 {}", userId, sseEmitterMap.size());return sseEmitter;}/*** 给制定用户发送消息* Author: SiQiangMing 2025/5/28 17:21* @param userId: 用户id* @param sseMessage: 消息* @return void*/public static void sendMessage(Long userId, String sseMessage) {if (sseEmitterMap.containsKey(userId)) {try {sseEmitterMap.get(userId).send(sseMessage);log.info("----------------------------用户 {} 推送消息 {}", userId, sseMessage);} catch (IOException e) {log.error("----------------------------用户 {} 推送消息异常", userId);disconnect(userId);}} else {log.error("----------------------------消息推送 用户 {} 不存在,链接总数 {}", userId, sseEmitterMap.size());}}/*** 判断用户是否存在sse链接* Author: SiQiangMing 2025/5/29 10:02* @param userId:* @return boolean*/public static boolean checkSseExist(Long userId) {if (userId == null) {return false;}return sseEmitterMap.containsKey(userId);}/*** 群发所有人,这里用来测试异常的排除链接* Author: SiQiangMing 2025/5/28 17:20* @param message: 消息* @return void*/public static void batchSendMessage(String message) {sseEmitterMap.forEach((k, v) -> {try {v.send(message, MediaType.APPLICATION_JSON);} catch (IOException e) {log.error("----------------------------用户 {} 推送异常", k);disconnect(k);}});}/*** 移除用户连接* Author: SiQiangMing 2025/5/28 17:20* @param userId: 用户id* @return void*/public static void disconnect(Long userId) {if (sseEmitterMap.containsKey(userId)) {sseEmitterMap.get(userId).complete();sseEmitterMap.remove(userId);log.info("----------------------------移除用户 {}, 剩余连接 {}", userId, sseEmitterMap.size());} else {log.error("-----------------------------移除用户 {} 已被移除,剩余连接 {}", userId, sseEmitterMap.size());}}/*** 结束回调* Author: SiQiangMing 2025/5/28 17:19* @param userId: 用户id* @return java.lang.Runnable*/private static Runnable completionCallBack(Long userId) {return () -> {log.info("----------------------------用户 {} 结束连接", userId);};}/*** 超时回调* Author: SiQiangMing 2025/5/28 17:20* @param userId: 用户id* @return java.lang.Runnable*/private static Runnable timeoutCallBack(Long userId) {return () -> {log.error("----------------------------用户 {} 连接超时", userId);disconnect(userId);};}/*** 异常回调* Author: SiQiangMing 2025/5/28 17:20* @param userId: 用户id* @return java.util.function.Consumer<java.lang.Throwable>*/private static Consumer<Throwable> errorCallBack(Long userId) {return throwable -> {log.error("----------------------------用户 {} 连接异常", userId);disconnect(userId);};}
}

三、前端

前端创建链接,请求后端的创建接口,注意页面销毁的时候,关闭sse链接

mounted() {// sse链接createEventSource() {// newthis.eventSource = new EventSource("后端的创建sse路径");// 收到消息this.eventSource.onmessage = (e) => {// 消息处理 e.data};// 异常处理this.eventSource.onerror = (error) => {console.error(error);};},
},
unmounted() {// 组件销毁时关闭 SSE 连接if (this.eventSource) {this.eventSource.close();}
},

3、创建sse

    /*** 处理客户端的连接请求* Author: SiQiangMing 2025/5/26 16:06* @return org.springframework.web.servlet.mvc.method.annotation.SseEmitter*/@GetMapping(value = "/xxx", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public SseEmitter xxx() {// 返回 SseEmitter 给客户端Long userId = SecurityUtils.getUserId();SseEmitter sseEmitter = SseEmitterUtil.connect(userId);// 可以直接带初始化信息返回SseEmitterUtil.sendMessage(userId, "消息");return sseEmitter;}

 四、nginx配置

        在这里遇到了一些问题,记录下解决方案。

1、在idea开发工具都正常,部署到生产环境后,sse后端能推送,前端没有收到消息。排查浏览器网络请求,nginx日志发现,客户端主动关闭了链接。

        在nginx.conf中增加配置

location /精准匹配sse创建路径 {add_header 'Cache-Control' 'no-cache'; //不缓存数据,每次请求时都会从服务器获取最新的内容proxy_set_header Connection ''; // 的作用是清除或覆盖 Connection头proxy_http_version 1.1;proxy_set_header Host $host; //确保后端服务器接收到的 Host 值与客户端原始请求的 Host 一致,或符合后端服务器的预期。proxy_pass http://xxx:port/sse创建路径;
}

2、SSE链接一分钟请求一次,频繁创建。

        在之前的配置中追加配置

location /精准匹配sse创建路径 {proxy_connect_timeout 3600s; // 解决1分钟重连,proxy_send_timeout 3600s; // 解决1分钟重连,proxy_read_timeout 3600s; // 解决1分钟重连,add_header 'Cache-Control' 'no-cache'; //不缓存数据,每次请求时都会从服务器获取最新的内容proxy_set_header Connection ''; // 的作用是清除或覆盖 Connection头proxy_http_version 1.1;proxy_set_header Host $host; //确保后端服务器接收到的 Host 值与客户端原始请求的 Host 一致,或符合后端服务器的预期。proxy_pass http://xxx:port/sse创建路径;
}

3、正常情况,链接保持了40分钟,还正常

4、并发数问题

        因为这里使用的http,所以版本是HTTP/1.1,同一个端口并发sse,只有6个,有两种解决方案,后期使用HTTP/2.0,默认100并发,满足要求。

http://www.lryc.cn/news/2393161.html

相关文章:

  • python中使用高并发分布式队列库celery的那些坑
  • 哈工大计算机系统大作业 程序人生-Hello’s P2P
  • 计算机一次取数过程分析
  • Halcon联合QT ROI绘制
  • 力扣面试150题--二叉树的右视图
  • 数据绑定页面的完整的原理、逻辑关系、实现路径是什么?页面、表格、字段、属性、值、按钮、事件、模型、脚本、服务编排、连接器等之间的关系又是什么?
  • 江西某石灰石矿边坡自动化监测
  • 《Python 应用中的蓝绿部署与滚动更新:持续集成中的实践与优化》
  • C# 类和继承(所有类都派生自object类)
  • 02业务流程的定义
  • cursor rules设置:让cursor按执行步骤处理(分析需求和上下文、方案对比、确定方案、执行、总结)
  • Linux操作系统之进程(四):命令行参数与环境变量
  • Typora-macOS 风格代码块
  • 如何迁移SOS数据库和修改sos服务的端口号
  • ansible自动化playbook简单实践
  • 20250526惠普HP锐14 AMD锐龙 14英寸轻薄笔记本电脑(八核R7-7730U)的显卡驱动下载
  • WIN11使用vscode搭建c语言开发环境
  • 2025年5月蓝桥杯stema省赛真题——象棋移动
  • AI重构SEO关键词精准定位
  • C++ 模板元编程语法大全
  • SPSS跨域分类:自监督知识+软模板优化
  • 【术语扫盲】BSP与MSP
  • vscode的Embedded IDE创建keil项目找不到源函数或者无法跳转
  • HTTP/2与HTTP/3特性详解:为你的Nginx/Apache服务器开启下一代Web协议
  • 构建高效智能客服系统的8大体验设计要点
  • CppCon 2014 学习:Making C++ Code Beautiful
  • 副本(Replica)在Elasticsearch中扮演什么角色?
  • 据传苹果将在WWDC上发布iOS 26 而不是iOS 19
  • 整理了Windows(7—11)官方镜像下载链接和各版本区别介绍
  • 数据库主键与索引详解