当前位置: 首页 > news >正文

StringBoot-SSE和WebFlux方式消息实时推送-默认单向-可增加交互接口

1. 效果演示

效果演示
最新版本更新
https://code.jiangjiesheng.cn/article/376?from=csdn

推荐 《高并发 & 微服务 & 性能调优实战案例100讲 源码下载》

2. 共同需要

WebMvcConfig.java

package cn.jiangjiesheng.config;import cn.hutool.json.JSONUtil;
import com.envtoday.ecp.datav.common.ResponseBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.web.HttpMediaTypeNotAcceptableException;
import org.springframework.web.servlet.HandlerExceptionResolver;
import org.springframework.web.servlet.ModelAndView;
import org.springframework.web.servlet.config.annotation.AsyncSupportConfigurer;
import org.springframework.web.servlet.config.annotation.CorsRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;import javax.servlet.http.HttpServletResponse;
import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;@Configuration
public class WebMvcConfig implements WebMvcConfigurer {@Overridepublic void addCorsMappings(CorsRegistry registry) {registry.addMapping("/**");}//SseWebFluxController方案必要@Overridepublic void configureAsyncSupport(AsyncSupportConfigurer configurer) {configurer.setTaskExecutor(mvcTaskExecutor());configurer.setDefaultTimeout(30000); // 可选:设置默认超时(毫秒)}//SseWebFluxController方案必要@Beanpublic AsyncTaskExecutor mvcTaskExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setThreadNamePrefix("sse-task-");executor.setCorePoolSize(10);executor.setMaxPoolSize(50);executor.setQueueCapacity(100);executor.setKeepAliveSeconds(60);executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());executor.initialize();return executor;}//SseWebFluxController || SseEmitterController方案都必要@Overridepublic void configureHandlerExceptionResolvers(List<HandlerExceptionResolver> resolvers) {resolvers.add((request, response, handler, ex) -> {//这是自定义的业务返回类ResponseBean<Object> result = new ResponseBean<>();// 处理event-stream接口断开时打印"java.io.IOException: 你的主机中的软件中止了一个已建立的连接。"(浏览器窗口关闭或刷新)boolean isSseRequest = MediaType.TEXT_EVENT_STREAM_VALUE.equals(request.getHeader("Accept"));if (isSseRequest) {String msg = ex.getMessage();if (msg != null) {msg = msg.toLowerCase();if (msg.contains("reset by peer") ||msg.contains("aborted") ||msg.contains("broken pipe") ||msg.contains("软件中止") ||//后面这两个都是HttpMediaTypeNotAcceptableException 判断,本质上是MediaType.TEXT_EVENT_STREAM_VALUE的头却返回了error页面ex instanceof HttpMediaTypeNotAcceptableException || msg.contains("could not find acceptable representation")) {return writeResponse(HttpStatus.OK.value(), "沉默处理这个错误", response, result);}}// 其他 IOException 不处理,交给后续 resolverreturn null;} else {// 其他特殊处理// ex instanceof xxException}//如果不想有当前方法处理异常,那就返回nullreturn writeResponse(HttpStatus.INTERNAL_SERVER_ERROR.value(), ex.getMessage(), response, result);});}private ModelAndView writeResponse(int code, String msg, HttpServletResponse response, ResponseBean<Object> result) {result.setCode(code);result.setMsg(msg);//开始写返回response.setCharacterEncoding("UTF-8");response.setHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON.toString());//实际可以直接写200response.setStatus(code == HttpStatus.OK.value() ? HttpStatus.OK.value() : HttpStatus.INTERNAL_SERVER_ERROR.value());try {response.getWriter().write(JSONUtil.toJsonStr(result));} catch (IllegalStateException ignore) {// 客户端中断类的不要打印} catch (Exception e) {System.out.println("responseResult出错了:" + e.getMessage());}return new ModelAndView();}}

test-sse.html

<!DOCTYPE html>
<html lang="en">
<head><title>Robust SSE Client</title><meta charset="UTF-8"><style>body {font-family: Arial, sans-serif;margin: 20px;}#status {padding: 10px;margin: 10px 0;border-radius: 4px;}.connected {background-color: #d4edda;color: #155724;}.disconnected {background-color: #f8d7da;color: #721c24;}.reconnecting {background-color: #fff3cd;color: #856404;}#messages {border: 1px solid #ccc;padding: 10px;height: 400px;overflow-y: auto;}.message {margin: 5px 0;padding: 5px;background: #f0f0f0;border-radius: 3px;}</style>
</head>
<body>
<h1>📡 Server-Sent Events (SSE) Client</h1><div id="status" class="disconnected">Status: Disconnected</div>
<div id="dataStatus" class="connected">dataStatus: 等待连接</div>
<div class="connected">说明: 列表最多加载50条,避免dom结构崩溃,支持滚动列表暂停(列表长度超过窗口),并在滚动到最后或者离开列表区域后加载新数据</div>
<div class="connected">\t 如果需要交互,需要新增一个接口,带上clientId和业务参数</div>
<div id="messages"></div><script>// 生成客户端唯一标识(可用 UUID,简单示例用时间戳+随机数)// 生成客户端唯一标识(可用 UUID,简单示例用时间戳+随机数)const clientId = 'client-' + Date.now() + '-' + Math.random().toString(36).slice(2, 11);console.log('Client ID:', clientId);//const url = urlConstant.url_SSE + `?clientId=${encodeURIComponent(clientId)}`;// 创建带 clientId 的连接(和服务端配合)// 两种后端方案://const url = `http://192.168.4.143:8803/datav/sse-stream-emitter?clientId=${encodeURIComponent(clientId)}`;const url = `http://192.168.4.143:8803/datav/sse-stream-webFlux?clientId=${encodeURIComponent(clientId)}`;let eventSource = null;let reconnectTimer = null;let reconnectAttempts = 0;const MAX_RECONNECT_ATTEMPTS = 100; // 最大重试次数(可选)const BASE_RECONNECT_DELAY = 3000; // 初始重连延迟(3秒)const MAX_RECONNECT_DELAY = 30000; // 最大延迟(30秒)const messagesDiv = document.getElementById('messages');let isUserInView = false;        // 鼠标是否在 messagesDiv 内let isUserManuallyScrolledUp = false; // 用户是否手动向上滚动(未到底)// 暂存新消息let pendingMessages = [];function connect() {// 避免重复创建if (eventSource) {eventSource.close();}// 显示连接中状态updateStatus('Connecting...', 'reconnecting');updateDataStatus("等待数据...")eventSource = new EventSource(url);eventSource.onopen = function (event) {console.log('SSE connection opened');updateStatus(`Connected [ID: ${clientId}]`, 'connected');updateDataStatus("等待连接...")reconnectAttempts = 0; // 重置重试计数};//后端没有指定event,处理 普通数据流,比如日志、聊天消息、实时价格等【本实】eventSource.onmessage = function (event) {const data = event.data;const time = new Date().toLocaleTimeString();const message = document.createElement('div');message.className = 'message';message.textContent = `[${time}] ${data}`;addToMessageDiv(message)};// 后端指定event:处理 特定类型的通知,比如告警、系统事件、用户提醒等eventSource.addEventListener('custom-message', function (event) {const data = event.data;const time = new Date().toLocaleTimeString();const message = document.createElement('div');message.className = 'message';message.style.backgroundColor = '#cce5ff';message.textContent = `[${time}] 🔔 Custom: ${data}`;addToMessageDiv(message)});// 监听特定事件类型(如 heartbeat, custom-message)eventSource.addEventListener('heartbeat', function (event) {//发布时不要打印//console.log('SSE Heartbeat received:', event.data);});eventSource.onerror = function (event) {console.error('SSE Error:', event);// readyState: 0=CONNECTING, 1=OPEN, 2=CLOSEDif (eventSource.readyState === EventSource.CLOSED) {console.log('Connection closed. Attempting to reconnect...');scheduleReconnect();} else if (eventSource.readyState === EventSource.CONNECTING) {updateStatus('Reconnecting...', 'reconnecting');}};//当鼠标在messagesDiv 时,就不要滚动initMouseEnterOrScroll();}function scheduleReconnect() {if (reconnectAttempts >= MAX_RECONNECT_ATTEMPTS) {updateStatus('Max retries reached. Stopped.', 'disconnected');return;}reconnectAttempts++;const delay = Math.min(BASE_RECONNECT_DELAY * Math.pow(1.5, reconnectAttempts), MAX_RECONNECT_DELAY);updateStatus(`Reconnecting in ${delay / 1000}s... (Attempt ${reconnectAttempts})`, 'reconnecting');clearTimeout(reconnectTimer);reconnectTimer = setTimeout(() => {connect();}, delay);}function updateStatus(text, className) {const statusDiv = document.getElementById('status');statusDiv.textContent = 'Status: ' + text;statusDiv.className = ''; // 清除旧类statusDiv.classList.add(className);}function updateDataStatus(text) {const dataStatusDiv = document.getElementById('dataStatus');dataStatusDiv.textContent = 'dataStatus: ' + text;}// 页面加载完成后连接// 通用跨浏览器事件绑定工具function on(element, event, handler) {if (element.addEventListener) {element.addEventListener(event, handler, false);} else if (element.attachEvent) {element.attachEvent('on' + event, handler);}}//当鼠标在messagesDiv 时,就不要滚动function initMouseEnterOrScroll() {if (!messagesDiv) {console.error('未找到 id 为 "messages" 的元素');}// 使用 on() 绑定事件,兼容老浏览器if (messagesDiv) {on(messagesDiv, 'mouseenter', function () {isUserInView = true;console.log("🖱️ 进入消息区域");updateDataStatus("鼠标进入消息区域,暂定滚动,缓存消息")});on(messagesDiv, 'mouseleave', function () {isUserInView = false;console.log("🖱️ 离开消息区域");updateDataStatus("鼠标离开消息区域,恢复滚动,释放消息")// 如果离开时有暂存消息,立即释放if (pendingMessages.length > 0) {flushPendingMessages();}});// 可选:监听滚动,智能判断是否恢复自动滚动on(messagesDiv, 'scroll', function () {updateScrollState();// 如果回到底部,释放暂存消息if (!isUserManuallyScrolledUp && pendingMessages.length > 0) {flushPendingMessages();}});// ========== 初始化 ==========updateScrollState();}}function addToMessageDiv(message) {// 🔴 关键判断:只要用户在区域内 且 不在底部,就暂存if (isUserInView && isUserManuallyScrolledUp) {console.log("📌 消息暂存:用户在查看历史");updateDataStatus("消息暂存:用户在查看历史")pendingMessages.push(message);//showNewMessageBadge(pendingMessages.length);return;}// 添加新消息messagesDiv.appendChild(message);console.log("✅ 直接添加消息");updateDataStatus("直接添加消息")keepMaxRowInMessageDiv();// 滚动到底部messagesDiv.scrollTop = messagesDiv.scrollHeight;}function keepMaxRowInMessageDiv() {// 限制最大消息数量为 50 避免页面无限堆积、影响性能const maxMessages = 50;const messageElements = messagesDiv.querySelectorAll('.message');if (messageElements.length > maxMessages) {// 移除最早的一条(即第一个 .message 元素)messagesDiv.removeChild(messageElements[0]);}}// ========== 滚动判断函数 ==========function updateScrollState() {const isAtBottom =messagesDiv.scrollHeight - messagesDiv.clientHeight <= messagesDiv.scrollTop + 10;isUserManuallyScrolledUp = !isAtBottom;}// ========== 释放所有暂存消息 ==========function flushPendingMessages() {if (pendingMessages.length === 0) return;console.log(`⬇️ 释放 ${pendingMessages.length} 条暂存消息`);updateDataStatus(`释放 ${pendingMessages.length} 条暂存消息`)pendingMessages.forEach(msg => {messagesDiv.appendChild(msg);keepMaxRowInMessageDiv();});// 清空暂存pendingMessages = [];//hideNewMessageBadge();// 强制滚动到底部messagesDiv.scrollTop = messagesDiv.scrollHeight;}// 页面加载完成后连接on(window, 'load', function () {connect();});// 页面卸载时关闭连接on(window, 'beforeunload', function () {if (eventSource) {eventSource.close();}if (reconnectTimer) {clearTimeout(reconnectTimer);}});
</script>
</body>
</html>

3. 两种方案

3.1 webflux+reactor
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency><groupId>io.projectreactor</groupId><artifactId>reactor-core</artifactId><version>3.5.0</version>
</dependency>

SseWebFluxController.java

package cn.jiangjiesheng.controller;import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.publisher.SignalType;
import reactor.core.publisher.Sinks;import javax.annotation.PreDestroy;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;@RestController
public class SseWebFluxController {// 使用 ConcurrentHashMap 存储每个客户端的标识和其 FluxSink(可选:用于反向推送)private final Map<String, Sinks.Many<ServerSentEvent<String>>> clientSinks = new ConcurrentHashMap<>();private final ScheduledExecutorService scheduler =Executors.newScheduledThreadPool(10); // 可根据负载调整线程数private static volatile boolean hasInvokeMock = false;// /datav/sse-stream@GetMapping(value = "/sse-stream-webFlux", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public Flux<ServerSentEvent<String>> streamEvents(@RequestParam String clientId) {// 最多缓存 100 条消息 防止浏览器挤压Sinks.Many<ServerSentEvent<String>> sink = Sinks.many().multicast().onBackpressureBuffer(100);// 注册 sinkclientSinks.put(clientId, sink);// 定时数据流Flux<ServerSentEvent<String>> intervalFlux = Flux.interval(Duration.ofSeconds(5))//.takeWhile(ignored -> clientSinks.containsKey(clientId)).map(sequence -> ServerSentEvent.<String>builder().id(clientId + "-" + sequence).data("定时数据流模拟[无event]: SSE from WebFlux - " + LocalDateTime.now() + " [Client: " + clientId + "]").build());// 心跳Flux<ServerSentEvent<String>> heartbeat = Flux.interval(Duration.ofSeconds(5))//.takeWhile(ignored -> clientSinks.containsKey(clientId)).map(seq -> ServerSentEvent.<String>builder().event("heartbeat").data("ping").build());// 合并流Flux<ServerSentEvent<String>> serverSentEventFlux = Flux.merge(intervalFlux, heartbeat, sink.asFlux()).doOnSubscribe(sub -> System.out.println("✅ 客户端连接: " + clientId)).doOnCancel(() -> {String threadName = Thread.currentThread().getName();System.out.println("🔌 客户端取消连接: " + clientId + " | 线程: " + threadName);// 判断是否为 sse-task 线程
//                    if (threadName.startsWith("sse-task-")) {
//                        System.out.println("⚠️ 取消发生在 sse-task 线程!可能并发清理!");
//                    } else if (threadName.startsWith("http-nio-")) {
//                        System.out.println("✅ 取消发生在请求线程,安全。");
//                    }//cleanupClient(clientId);}).doFinally(signalType -> {String threadName = Thread.currentThread().getName();System.out.println("🛑 doFinally 触发 [信号: " + signalType + "] | 客户端: " + clientId + " | 线程: " + threadName);// signalType 可能是:// - ON_COMPLETE// - ON_ERROR// - CANCELif (signalType == SignalType.CANCEL || signalType == SignalType.ON_ERROR || signalType == SignalType.ON_COMPLETE) {cleanupClient(clientId);}}).doOnTerminate(() -> {System.out.println("🛑 流终止: " + clientId);cleanupClient(clientId);}).doOnError(err -> {System.err.println("❌ 流错误 for " + clientId + ": " + err.getMessage());cleanupClient(clientId);}).onErrorResume(throwable -> Flux.empty());//模拟定时任务mockSendBizDate();System.out.println("🧹 清理客户端资源: 当前线程:" + Thread.currentThread().getName() + "]-" + clientId);return serverSentEventFlux;}private void mockSendBizDate() {if (!hasInvokeMock) {synchronized (SseWebFluxController.class) {if (!hasInvokeMock) {scheduler.scheduleWithFixedDelay(() -> {// 创建副本,避免遍历时修改 mapList<String> clients = new ArrayList<>(clientSinks.keySet());for (String clientId : clients) {try {pushMessageToClient(clientId, "custom-message", "具体业务event mock " + LocalDateTime.now() + " [Client: " + clientId + "]");} catch (Exception e) {System.err.println("定时任务推送异常: " + e.getMessage());// pushMessageToClient 内部会处理清理}}}, 1, 2, TimeUnit.SECONDS);hasInvokeMock = true;}}}}private void cleanupClient(String clientId) {Sinks.Many<ServerSentEvent<String>> removed = clientSinks.remove(clientId);if (removed != null) {removed.tryEmitComplete(); // 通知 sink 完成System.out.println("🧹 清理客户端资源: 当前线程:[" + Thread.currentThread().getName() + "]-" + clientId);}}// 示例:外部服务可以调用此方法向指定客户端推送消息public void pushMessageToClient(String clientId, String event, String eventData) {Sinks.Many<ServerSentEvent<String>> sink = clientSinks.get(clientId);if (sink == null) {System.out.println("⏭️ 客户端不存在或已断开: " + clientId);return;}Sinks.EmitResult result = sink.tryEmitNext(ServerSentEvent.<String>builder().event(event).data(eventData).build());switch (result) {case OK:// 发送成功break;case FAIL_TERMINATED:case FAIL_CANCELLED:case FAIL_ZERO_SUBSCRIBER:// 这些状态表明 sink 已无效,应清理System.err.println("❌ 推送失败 [" + clientId + "],原因: " + result + ",正在清理");cleanupClient(clientId);break;case FAIL_OVERFLOW:// 缓冲区满,可能是客户端慢或断开,可尝试清理或降级System.err.println("⚠️ 推送失败 [" + clientId + "]:缓冲区溢出 (FAIL_OVERFLOW)");// 可选:延迟后重试,或直接清理(保守做法)// 这里我们选择清理,防止堆积cleanupClient(clientId);break;case FAIL_NON_SERIALIZED:// 非法并发调用,属于代码 bugSystem.err.println("🚨 非法并发调用 tryEmitNext: " + result);// 不清理 clientSinks,但应修复调用方线程安全问题break;default:// 预防未来新增枚举值System.err.println("❓ 未知 EmitResult: " + result);break;}}@PreDestroypublic void shutdown() {// 清理所有客户端new ArrayList<>(clientSinks.keySet()).forEach(this::cleanupClient);// 关闭线程池scheduler.shutdown();try {if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {scheduler.shutdownNow();}} catch (InterruptedException e) {scheduler.shutdownNow();Thread.currentThread().interrupt();}}}
3.1 Emitter

SseEmitterController.java

package com.envtoday.ecp.datav.controller;import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;import javax.annotation.PreDestroy;
import java.io.IOException;
import java.time.LocalDateTime;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;@RestController
public class SseEmitterController {// 存储每个客户端的 SseEmitter(用于反向推送)private final Map<String, SseEmitter> clientEmitters = new ConcurrentHashMap<>();// 全局共享的调度器,避免每个连接创建线程池private final ScheduledExecutorService scheduler =Executors.newScheduledThreadPool(3); // 可根据负载调整线程数// 防止重复启动定时任务private static volatile boolean hasInvokeMock = false;/*** SSE 流接口*/@GetMapping(value = "/sse-stream-emitter", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public SseEmitter streamEvents(@RequestParam String clientId) throws IOException {// 创建 SseEmitter,设置超时时间(可选,0 表示永不过期)SseEmitter emitter = new SseEmitter(0L); // 0 = no timeout// 注册 emitterclientEmitters.put(clientId, emitter);// 设置生命周期回调(都在 http-nio 线程执行,安全)emitter.onCompletion(() -> {System.out.println("✅ onCompletion: 客户端断开或完成 | 客户端: " + clientId);cleanupClient(clientId);});emitter.onTimeout(() -> {System.out.println("⏰ onTimeout: 客户端连接超时 | 客户端: " + clientId);cleanupClient(clientId);emitter.complete();});emitter.onError(throwable -> {// Windows 系统常见断开提示:"你的主机中的软件中止了一个已建立的连接。"// Linux/Unix 常见:"Connection reset by peer" 或 "Broken pipe"String msg = throwable.getMessage();if (msg != null && (msg.contains("中止了一个已建立的连接") ||msg.contains("Connection reset by peer") ||msg.contains("Broken pipe") ||msg.contains("Connection timed out"))) {// 视为正常断开,不打印错误} else {// 其他异常,可以打印警告System.err.println("⚠️ SSE 连接异常 [非网络断开]: " + clientId + " | " + throwable.getMessage());}cleanupClient(clientId);
//            emitter.complete();});// 发送初始连接消息try {emitter.send(SseEmitter.event().name("connected").data("已连接到 SSE 服务 [Client: " + clientId + "] at " + LocalDateTime.now()));} catch (IOException e) {// 客户端可能已断开emitter.complete();cleanupClient(clientId);return emitter;}// 启动定时任务(仅首次调用)mockSendBizDate();// 启动内置定时数据流(每 5 秒)startIntervalData(clientId, emitter);// 启动心跳(每 5 秒)startHeartbeat(clientId, emitter);System.out.println("✅ 客户端连接成功: " + clientId);return emitter;}/*** 启动每 5 秒的定时数据流*/private void startIntervalData(String clientId, SseEmitter emitter) {scheduler.scheduleAtFixedRate(() -> {if (!clientEmitters.containsKey(clientId)) {return;}// 2. 检查 emitter 是否已过期或连接断开// 方法:尝试 send 一个空数据,看是否抛出异常,或者通过 onCompletion/onError 已触发// 但更主动的方式是:我们无法直接获取状态,但可以封装判断逻辑try {emitter.send(SseEmitter.event().id(clientId + "-" + System.currentTimeMillis()).data("定时数据流模拟[无event]: SSE from SseEmitter - " + LocalDateTime.now() + " [Client: " + clientId + "]"));} catch (IOException e) {// 客户端断开连接是正常行为,不打印错误日志// 可选:只在 DEBUG 级别打印// log.debug("客户端 {} 连接中断,准备清理", clientId);cleanupClient(clientId);}}, 0, 5, TimeUnit.SECONDS);// emitter.onCompletion(intervalScheduler::shutdown);}/*** 启动每 5 秒的心跳*/private void startHeartbeat(String clientId, SseEmitter emitter) {ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();scheduler.scheduleAtFixedRate(() -> {if (!clientEmitters.containsKey(clientId)) {//heartbeatScheduler.shutdown();return;}try {emitter.send(SseEmitter.event().name("heartbeat").data("ping"));} catch (IOException e) {// 静默处理断开连接cleanupClient(clientId);// heartbeatScheduler.shutdown();}}, 5, 5, TimeUnit.SECONDS);// emitter.onCompletion(heartbeatScheduler::shutdown);}/*** 模拟定时业务消息推送(每 2 秒)*/private void mockSendBizDate() {if (!hasInvokeMock) {synchronized (SseEmitterController.class) {if (!hasInvokeMock) {scheduler.scheduleWithFixedDelay(() -> {List<String> clients = new ArrayList<>(clientEmitters.keySet());for (String clientId : clients) {try {pushMessageToClient(clientId, "custom-message", "具体业务event mock " + LocalDateTime.now() + " [Client: " + clientId + "]");} catch (Exception e) {System.err.println("定时任务推送异常: " + e.getMessage());}}}, 1, 2, TimeUnit.SECONDS);hasInvokeMock = true;}}}}/*** 清理客户端资源*/private void cleanupClient(String clientId) {SseEmitter removed = clientEmitters.remove(clientId);if (removed != null) {try {removed.complete(); // 确保关闭连接} catch (Exception ignored) {}System.out.println("🧹 已清理客户端资源: " + clientId + " | 当前线程: [" + Thread.currentThread().getName() + "],时间:"+new Date());}}/*** 外部服务可调用此方法向指定客户端推送消息*/public void pushMessageToClient(String clientId, String event, String eventData) {SseEmitter emitter = clientEmitters.get(clientId);if (emitter == null) {// 客户端已断开,不打印日志(避免刷屏)return;}try {emitter.send(SseEmitter.event().name(event).data(eventData));} catch (IOException e) {// 客户端连接已断开,静默清理// System.out.println(e);cleanupClient(clientId);}}// ========================// 可选:提供管理接口// ========================/*** 获取当前连接的客户端数(用于监控)*/public int getClientCount() {return clientEmitters.size();}/*** 广播消息给所有客户端*/public void broadcast(String event, String data) {List<String> failedClients = new ArrayList<>();for (Map.Entry<String, SseEmitter> entry : clientEmitters.entrySet()) {try {entry.getValue().send(SseEmitter.event().name(event).data(data));} catch (IOException e) {failedClients.add(entry.getKey());}}// 清理失败的客户端failedClients.forEach(clientId -> {System.err.println("广播失败,清理客户端: " + clientId);cleanupClient(clientId);});}@PreDestroypublic void shutdown() {// 清理所有客户端new ArrayList<>(clientEmitters.keySet()).forEach(this::cleanupClient);// 关闭线程池scheduler.shutdown();try {if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {scheduler.shutdownNow();}} catch (InterruptedException e) {scheduler.shutdownNow();Thread.currentThread().interrupt();}}
}

最新版本更新
https://code.jiangjiesheng.cn/article/376?from=csdn

推荐 《高并发 & 微服务 & 性能调优实战案例100讲 源码下载》

http://www.lryc.cn/news/621463.html

相关文章:

  • qt项目中解决关闭弹窗后执行主界面的信号槽时闪退问题
  • c++中的Lambda表达式详解
  • ATAM:基于场景的软件架构权衡分析法
  • 使用Docker和Miniconda3搭建YOLOv13开发环境
  • 微服务架构概述
  • docker 容器管理入门教程
  • Docker network网络管理入门教程
  • JS 解构赋值语法
  • Vue浅学
  • 0814 TCP通信协议
  • 【C#补全计划】泛型约束
  • [TryHackMe](知识学习)---基于堆栈得到缓冲区溢出
  • Vue 3 + TypeScript:package.json 示例 / 详细注释说明
  • Apache 虚拟主机配置冲突导致 404 错误的排查总结
  • 通信算法之313:FPGA中实现滑动相关消耗DSP资源及7045/7035的乘法器资源
  • redis中分布式锁的应用
  • 面试题:如何用Flink实时计算QPS
  • 解锁AI潜能:五步写出让大模型神级指令
  • 宋红康 JVM 笔记 Day01|JVM介绍
  • 嵌入式开发学习———Linux环境下网络编程学习(一)
  • 【数据分享】351个地级市农业相关数据(2013-2022)-有缺失值
  • 速通C++类型转换(代码+注释)
  • AI测试自动化:智能软件质量守护者
  • 带root权限_贝尔RG020ET-CA融合终端S905L处理器当贝纯净版刷机教程
  • ROS机器人云实践案例博客建议和范文-AI版本
  • DAY 22|算法篇——贪心四
  • linux初始化配置
  • 【Linux系统】进程的生命旅程:从创建到独立的演绎
  • vue+moment将分钟调整为5的倍数(向下取整)
  • 人工智能——卷积神经网络自定义模型全流程初识