NodeJS搭建SSE接口服务
下面是一个基于Node.js的SSE(Server-Sent Events)接口服务的实现。这个示例使用Express框架搭建服务器,支持客户端订阅事件流并实时接收服务器推送的数据。
const express = require('express');
const cors = require('cors');
const { v4: uuidv4 } = require('uuid');const app = express();
const PORT = 3000;// 中间件
app.use(cors());
app.use(express.json());// 存储客户端连接
const clients = new Map();// SSE 接口
app.get('/sse', (req, res) => {// 设置 SSE 响应头res.setHeader('Content-Type', 'text/event-stream');res.setHeader('Cache-Control', 'no-cache');res.setHeader('Connection', 'keep-alive');res.flushHeaders();// 生成客户端ID并存储连接const clientId = uuidv4();const client = {id: clientId,res};clients.set(clientId, client);// 发送初始消息sendEvent(clientId, { type: 'connected', data: { clientId } });// 处理客户端断开连接req.on('close', () => {clients.delete(clientId);console.log(`Client ${clientId} disconnected`);});
});// 向特定客户端发送事件
function sendEvent(clientId, event) {const client = clients.get(clientId);if (client) {client.res.write(`data: ${JSON.stringify(event)}\n\n`);}
}// 向所有客户端广播事件
function broadcastEvent(event) {clients.forEach(client => {client.res.write(`data: ${JSON.stringify(event)}\n\n`);});
}// 模拟定时推送数据
setInterval(() => {broadcastEvent({type: 'update',data: {timestamp: new Date().toISOString(),message: '定时更新数据'}});
}, 5000);// 手动触发推送接口
app.post('/trigger', (req, res) => {const { message } = req.body;broadcastEvent({type: 'manual',data: {timestamp: new Date().toISOString(),message}});res.json({ success: true });
});// 启动服务器
app.listen(PORT, () => {console.log(`Server running on port ${PORT}`);
});
代码说明:
-
依赖安装:
npm install express cors uuid
-
核心功能:
- SSE连接管理:使用
Map
存储所有客户端连接,每个连接有唯一ID。 - 事件发送:
sendEvent(clientId, event)
:向特定客户端发送事件。broadcastEvent(event)
:向所有客户端广播事件。
- 响应头设置:
Content-Type: text/event-stream Cache-Control: no-cache Connection: keep-alive
- SSE连接管理:使用
-
接口说明:
GET /sse
:客户端订阅SSE流的入口。POST /trigger
:手动触发向所有客户端推送消息(需提供message
字段)。
-
客户端示例:
const eventSource = new EventSource('http://localhost:3000/sse');eventSource.onmessage = (event) => {const data = JSON.parse(event.data);console.log('收到消息:', data); };eventSource.onerror = (error) => {console.error('SSE连接错误:', error); };
特性与扩展:
- 心跳机制:可添加定期发送心跳消息以保持连接。
- 错误处理:包含基本的连接断开处理。
- 事件类型:支持不同类型的事件(
connected
,update
,manual
)。 - 安全增强:可添加身份验证、权限控制等。
这个实现提供了SSE服务的基础框架,你可以根据具体需求扩展其功能,如添加用户认证、事件过滤、持久化存储等。