当前位置: 首页 > news >正文

简单的springboot使用sse功能

什么是sse?

1、SSE 是Server-Sent Events(服务器发送事件)

2、SSE是一种允许服务器主动向客户端推送实时更新的技术。

3、它基于HTTP协议,并使用了其长连接特性,在客户端与服务器之间建立一条持久化的连接。 通过这条连接,服务器可以实时地向客户端发送事件流,而客户端可以监听这些事件并作出相应的处理。

4、SSE是单向通信机制,即只能由服务器向客户端推送数据,客户端不能通过SSE向服务器发送数据。

5、SSE在现代浏览器和移动设备上得到了广泛的支持,是实现实时Web应用的一种有效方式。

使用流程(经测试,此方式不会丢失消息,靠谱能用!

1、引入springboot的web基本依赖,这里不细说

2、controller中

 /*** 订阅sse消息** @return*/@CrossOrigin@RequestMapping(path = "/subscribe/{userId}")public SseEmitter subscribe(@PathVariable String userId) {// 设置超时时间,0表示不过期。默认30秒,超过时间未完成会抛出异常:AsyncRequestTimeoutExceptionreturn SSEServer.connect(userId);}

3、SSEServer类

package com.orison.controller;import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;/*** @ClassName SSEServer* @Description TODO* @Author xiaoli* @Date 2022-10-26 18:00* @Version 1.0**/
@Slf4j
public class SSEServer {/*** 当前连接数*/private static AtomicInteger count = new AtomicInteger(0);private static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();public static SseEmitter connect(String userId){//设置超时时间,0表示不过期,默认是30秒,超过时间未完成会抛出异常SseEmitter sseEmitter = new SseEmitter(0L);//注册回调sseEmitter.onCompletion(completionCallBack(userId));sseEmitter.onError(errorCallBack(userId));sseEmitter.onTimeout(timeOutCallBack(userId));sseEmitterMap.put(userId,sseEmitter);//数量+1count.getAndIncrement();log.info("create new sse connect ,current user:{}",userId);return sseEmitter;}/*** 给指定用户发消息*/public static void sendMessage(String userId, String message){if(sseEmitterMap.containsKey(userId)){try{sseEmitterMap.get(userId).send(message);}catch (IOException e){log.error("user id:{}, send message error:{}",userId,e.getMessage());log.error("Exception:",e);}}}/*** 想多人发送消息,组播*/public static void groupSendMessage(String groupId, String message){if(sseEmitterMap!=null&&!sseEmitterMap.isEmpty()){sseEmitterMap.forEach((k,v) -> {try{if(k.startsWith(groupId)){v.send(message, MediaType.APPLICATION_JSON);}}catch (IOException e){log.error("user id:{}, send message error:{}",groupId,message);removeUser(k);}});}}public static void batchSendMessage(String message) {sseEmitterMap.forEach((k,v)->{try{v.send(message,MediaType.APPLICATION_JSON);}catch (IOException e){log.error("user id:{}, send message error:{}",k,e.getMessage());removeUser(k);}});}/*** 群发消息*/public static void batchSendMessage(String message, Set<String> userIds){userIds.forEach(userId->sendMessage(userId,message));}public static void removeUser(String userId){sseEmitterMap.remove(userId);//数量-1count.getAndDecrement();log.info("remove user id:{}",userId);}public static List<String> getIds(){return new ArrayList<>(sseEmitterMap.keySet());}public static int getUserCount(){return count.intValue();}private static Runnable completionCallBack(String userId) {return () -> {log.info("结束连接,{}",userId);removeUser(userId);};}private static Runnable timeOutCallBack(String userId){return ()->{log.info("连接超时,{}",userId);removeUser(userId);};}private static Consumer<Throwable> errorCallBack(String userId){return throwable -> {log.error("连接异常,{}",userId);removeUser(userId);};}
}

 

4、前端

<script>function createEventSource() {const eventSource = new EventSource('http://localhost:13330/device/cameraDevice/subscribe/'+getRandomString(5));eventSource.onmessage = function(event) {console.log("sse连接中");if (event.data){console.log(event);//这里就是请求streamEvents接口返回的值,此时就可以通过Ajax展示出来了}};eventSource.onerror = function(event) {console.error("sse连接失败,每5秒尝试重新连接");// 关闭当前 EventSource 实例eventSource.close();// 尝试在 5 秒后重新连接(可以根据需要调整重连间隔)setTimeout(createEventSource, 5000);};return eventSource;}// 初始化 EventSource 连接createEventSource();function getRandomString(len) {const _charStr = 'abacdefghjklmnopqrstuvwxyzABCDEFGHJKLMNOPQRSTUVWXYZ0123456789';let min = 0, max = _charStr.length - 1, _str = '';//判断是否指定长度,否则默认长度为15len = len || 15;//循环生成字符串for (var i = 0, index; i < len; i++) {index = RandomIndex(min, max, i);_str += _charStr[index];}return _str;}/*** 随机生成索引* @param min 最小值* @param max 最大值* @param i 当前获取位置*/function RandomIndex(min, max, i) {const _charStr = 'abacdefghjklmnopqrstuvwxyzABCDEFGHJKLMNOPQRSTUVWXYZ0123456789';let index = Math.floor(Math.random() * (max - min + 1) + min),numStart = _charStr.length - 10;//如果字符串第一位是数字,则递归重新获取if (i == 0 && index >= numStart) {index = RandomIndex(min, max, i);}//返回最终索引值return index;}</script>

http://www.lryc.cn/news/496779.html

相关文章:

  • 【服务器问题】xshell 登录远程服务器卡住( 而 vscode 直接登录不上)
  • AI×5G 市场前瞻及应用现状
  • 利用 Redis 与 Lua 脚本解决秒杀系统中的高并发与库存超卖问题
  • 【MySQL】创建数据库、用户和密码
  • leetcode hot100【Leetcode 72.编辑距离】java实现
  • 腾讯阅文集团Java后端开发面试题及参考答案
  • protobuf实现Hbase数据压缩
  • 论文阅读之方法: Single-cell transcriptomics of 20 mouse organs creates a Tabula Muris
  • PHP语法学习(第三天)
  • PostgreSQL添加PostGIS扩展和存储坐标
  • Flink四大基石之State(状态) 的使用详解
  • Linux中dos2unix详解
  • MySQL MVCC 介绍
  • Linux篇之日志管理工具Logrotate介绍并结合crontab使用
  • Vulnhub靶场 Matrix-Breakout: 2 Morpheus 练习
  • 秒杀项目 超卖问题 详解
  • Linux系统编程之进程控制
  • 集合的相关性质与定义
  • pytest自定义命令行参数
  • c++预编译头文件
  • YOLOv8模型pytorch格式转为onnx格式
  • 电子课程开发中的典型误区
  • Docker 逃逸突破边界
  • 残差连接,就是当某一偏导等于0时,加上x偏导就是1,这样乘以1保证不失效
  • 博泽Brose EDI项目案例
  • 从科举到高考,人才选拔制度的变革与发展
  • 利用Docker一键发布Nginx-Tomcat-MySQL应用集群
  • 关于数据库数据国际化方案
  • 【系统架构设计师】高分论文:论信息系统的安全与保密设计
  • 使用Tauri创建桌面应用