一、SSE技术基础
1. SSE核心特性
特性 | 说明 |
---|
协议 | 基于HTTP的单向通信协议 |
数据格式 | text/event-stream |
重连机制 | 自动重连(默认3秒) |
事件类型 | 支持自定义事件类型 |
浏览器支持 | 除IE外主流浏览器均支持 |
2. 与WebSocket对比
维度 | SSE | WebSocket |
---|
协议方向 | 单向(服务端→客户端) | 双向 |
协议复杂度 | 简单(纯HTTP) | 复杂(独立协议) |
断线恢复 | 内置自动恢复 | 需手动实现 |
数据格式 | 文本为主 | 二进制/文本 |
适用场景 | 服务端推送场景 | 双向交互场景 |
二、Spring SSE实现方案
1. 基础控制器实现
@RestController
@RequestMapping("/api/monitor")
public class MonitorController {private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();@GetMapping(value = "/subscribe", produces = MediaType.TEXT_EVENT_STREAM_VALUE)public SseEmitter subscribe(@RequestParam String clientId) {SseEmitter emitter = new SseEmitter(60_000L); emitters.put(clientId, emitter);emitter.onCompletion(() -> emitters.remove(clientId));emitter.onTimeout(() -> emitters.remove(clientId));emitter.onError(e -> {log.error("SSE error", e);emitters.remove(clientId);});sendInitialData(emitter);return emitter;}private void sendInitialData(SseEmitter emitter) {try {emitter.send(SseEmitter.event().name("system-status").data(getCurrentStatus()).reconnectTime(5000L));} catch (IOException e) {emitter.completeWithError(e);}}
}
2. 监控数据推送服务
@Service
@RequiredArgsConstructor
public class MonitorPushService {private final SimpMessagingTemplate messagingTemplate;@Scheduled(fixedRate = 1000)public void pushSystemMetrics() {SystemMetrics metrics = collectMetrics();messagingTemplate.convertAndSend("/topic/system-metrics", metrics);messagingTemplate.convertAndSend("/queue/metrics", metrics);}private SystemMetrics collectMetrics() {return new SystemMetrics(ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage(),Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(),ManagementFactory.getThreadMXBean().getThreadCount());}
}
三、前端集成方案
1. 原生JavaScript实现
const eventSource = new EventSource('/api/monitor/subscribe?clientId=' + uuidv4());eventSource.addEventListener('system-status', (e) => {const data = JSON.parse(e.data);updateCpuChart(data.cpuUsage);updateMemoryChart(data.memoryUsage);
});eventSource.addEventListener('error', (e) => {if (e.readyState === EventSource.CLOSED) {console.log('Connection closed');} else {console.error('EventSource error:', e);}
});
eventSource.onerror = () => {eventSource.close();setTimeout(() => {new EventSource(eventSource.url);}, 10000);
};
2. React组件封装
import { useEffect, useState } from 'react';const SystemMonitor = () => {const [metrics, setMetrics] = useState({});useEffect(() => {const eventSource = new EventSource('/api/monitor/subscribe');eventSource.onmessage = (e) => {setMetrics(JSON.parse(e.data));};return () => eventSource.close();}, []);return (<div className="monitor-dashboard"><CpuGauge value={metrics.cpuLoad} /><MemoryChart data={metrics.memory} /><ThreadCounter count={metrics.threadCount} /></div>);
};
四、监控数据类型处理
1. 多维度监控数据模型
@Data
@Builder
public class SystemMetrics {private double cpuLoad;private long usedMemory;private long maxMemory;private int threadCount;private long gcCount;private long gcTime;private int activeSessions;private double requestRate;private double errorRate;private Map<String, Object> customMetrics;
}
2. 数据分片推送策略
public void pushDetailedMetrics(SseEmitter emitter, SystemMetrics metrics) {try {emitter.send(SseEmitter.event().name("cpu-metrics").data(metrics.getCpuLoad()));emitter.send(SseEmitter.event().name("memory-metrics").data(Map.of("used", metrics.getUsedMemory(),"max", metrics.getMaxMemory())));emitter.send(SseEmitter.event().name("other-metrics").data(Map.of("threads", metrics.getThreadCount(),"sessions", metrics.getActiveSessions())));} catch (IOException e) {emitter.completeWithError(e);}
}
五、高级功能实现
1. 动态订阅控制
@PostMapping("/subscribe/custom")
public SseEmitter subscribeCustomMetrics(@RequestBody MetricSubscription subscription) {SseEmitter emitter = new SseEmitter();ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();ScheduledFuture<?> future = executor.scheduleAtFixedRate(() -> {if (!emitter.isComplete()) {Map<String, Object> data = new HashMap<>();subscription.getMetrics().forEach(metric -> {data.put(metric, getMetricValue(metric));});try {emitter.send(data);} catch (IOException e) {emitter.completeWithError(e);}}}, 0, subscription.getInterval(), TimeUnit.SECONDS);emitter.onCompletion(() -> {future.cancel(true);executor.shutdown();});return emitter;
}
2. 历史数据回放
@GetMapping("/replay")
public SseEmitter replayHistoricalData(@RequestParam Instant from,@RequestParam Instant to,@RequestParam(defaultValue = "1") int speed) {SseEmitter emitter = new SseEmitter();List<SystemMetrics> history = metricStore.queryRange(from, to);new Thread(() -> {try {for (SystemMetrics metrics : history) {if (emitter.isComplete()) break;emitter.send(metrics);Thread.sleep(1000 / speed);}emitter.complete();} catch (Exception e) {emitter.completeWithError(e);}}).start();return emitter;
}
六、安全增强方案
1. 认证集成
@GetMapping("/secure/subscribe")
public SseEmitter secureSubscribe(@AuthenticationPrincipal User user) {SseEmitter emitter = new SseEmitter();if (!user.hasRole("MONITOR_VIEWER")) {emitter.completeWithError(new AccessDeniedException("Forbidden"));return emitter;}scheduledExecutor.scheduleAtFixedRate(() -> {try {emitter.send(filterByPermission(user, getMetrics()));} catch (IOException e) {emitter.completeWithError(e);}}, 0, 1, TimeUnit.SECONDS);return emitter;
}
2. 流量控制
@Bean
public WebMvcConfigurer sseConfigurer() {return new WebMvcConfigurer() {@Overridepublic void configureAsyncSupport(AsyncSupportConfigurer configurer) {configurer.registerDeferredResultInterceptors(new SseRateLimitInterceptor(10));}};
}public class SseRateLimitInterceptor implements DeferredResultProcessingInterceptor {private final ConcurrentMap<String, AtomicInteger> ipCounts = new ConcurrentHashMap<>();private final int maxPerIp;@Overridepublic <T> void beforeConcurrentHandling(NativeWebRequest request, DeferredResult<T> deferredResult) {String ip = request.getRemoteAddr();int count = ipCounts.computeIfAbsent(ip, k -> new AtomicInteger()).incrementAndGet();if (count > maxPerIp) {throw new TooManyRequestsException("SSE connection limit exceeded");}deferredResult.onCompletion(() -> {ipCounts.get(ip).decrementAndGet();});}
}
七、性能优化策略
1. 服务端配置优化
server:tomcat:max-threads: 200max-connections: 10000compression:enabled: truemime-types: text/event-streammin-response-size: 1024spring:mvc:async:request-timeout: 60000
2. 客户端优化建议
- 指数退避重连:连接失败时逐步增加重试间隔
- 数据压缩:启用SSE数据压缩
- 按需订阅:只订阅必要的数据类型
- 心跳检测:定期检查连接状态
- 数据聚合:客户端本地缓存和聚合数据
class SmartEventSource {constructor(url) {this.url = url;this.retryDelay = 1000;this.maxRetryDelay = 60000;this.connect();}connect() {this.es = new EventSource(this.url);this.es.onopen = () => {this.retryDelay = 1000; };this.es.onerror = () => {this.es.close();this.retryDelay = Math.min(this.retryDelay * 2, this.maxRetryDelay);setTimeout(() => this.connect(), this.retryDelay);};}
}
通过Spring SSE构建的实时监控系统,结合上述优化策略和安全方案,可以实现高性能、高可用的监控数据推送服务,满足从基础系统监控到复杂业务指标展示的各类场景需求。