当前位置: 首页 > news >正文

Go语言流式输出技术实现-服务器推送事件(Server-Sent Events, SSE)

目录

  • 引言
  • 背景与技术概述
  • 实现技术细节
    • 1. HTTP 头部配置
    • 2. 事件格式与发送
    • 3. 保持连接与刷新
    • 4. 处理连接关闭
      • 4.1 使用上下文管理连接生命周期
      • 4.2 使用通道管理客户端连接
    • 5. 客户端交互
    • 6.demo
    • 7.Go转发大模型流式输出demo

引言

服务器推送事件(Server-Sent Events, SSE)是一种基于 HTTP 的单向数据流技术,允许服务器通过标准 HTTP 连接向客户端推送实时更新。SSE 使用 Content-Type: text/event-stream 头部标识响应内容为事件流,例如大模型流式输出。

背景与技术概述

SSE 是 HTML5 规范的一部分,通过 EventSource API 提供客户端支持。它的主要特点包括:

  • 单向通信: 数据仅从服务器流向客户端,无法通过同一连接反向发送。
  • 自动重连: 客户端在连接断开后会自动尝试重连。
  • 基于 HTTP: 利用现有 HTTP 基础设施,无需额外协议支持。
  • 事件格式: 事件以文本形式发送,每条事件以 data: 开头,结束于两个换行符 \n\n。

在 Go 中,SSE 的实现通常依赖标准库 net/http,也可以结合框架(如 Gin)或第三方库(如 github.com/r3labs/sse)来简化开发。

实现技术细节

1. HTTP 头部配置

服务端必须在响应中设置以下头部:

  • Content-Type: text/event-stream: 标识响应为事件流。
  • Cache-Control: no-cache: 防止浏览器缓存响应,确保实时性。
  • Connection: keep-alive: 保持连接开放,支持持续流式传输。

2. 事件格式与发送

SSE 事件必须遵循特定格式,每条事件包括以下字段:

  • data:: 事件数据,多个 data: 行会被拼接为一条消息。
  • 事件以两个换行符 \n\n 结束,表示一条事件的结束。
    例如,发送一条消息 “Hello, World!” 的格式为:
data: Hello, World!

在 Go 中,事件发送通常通过 http.ResponseWriter 实现。例如,Pascal Allen 的 Medium 文章中使用了 Gin 框架c.SSEvent(“message”, msg) 方法,而 Kelche.co 的示例直接使用 fmt.Fprintf(w, “data: %d \n\n”, rand.Intn(100)) 发送随机数。

3. 保持连接与刷新

为了实现流式输出,服务端需要保持 HTTP 连接开放,通常通过无限循环实现。在每个循环中:

  • 生成或获取事件数据。
  • 写入响应,使用 w.(http.Flusher).Flush() 立即刷新,确保数据实时发送。

例如,Kelche.co 的 randomHandler 函数每 2 秒发送一次随机数:

for{rand.Seed(time.Now().UnixNano())fmt.Fprintf(w,"data: %d \n\n", rand.Intn(100))w.(http.Flusher).Flush()time.Sleep(2* time.Second)
}

4. 处理连接关闭

客户端可能随时断开连接,服务端需检测并安全退出。
例如,可以通过检查 http.ResponseWriter 的状态或使用 Hijack 方法检测连接状态。在实际应用中,推荐使用通道(channel)或上下文(context)管理连接生命周期。

4.1 使用上下文管理连接生命周期

  • 上下文的作用: 上下文可以用来传递取消信号和截止时间。例如,当客户端断开连接时,HTTP 请求的上下文会被取消,服务器可以通过 <-ctx.Done() 检测到。

  • 关键方法:

    • context.Background():创建一个空的根上下文,通常作为父上下文。
    • context.WithCancel(parentCtx):创建一个可手动取消的上下文,cancel() 函数用于取消。
    • context.WithTimeout(parentCtx, duration):创建一个在指定时间后自动取消的上下文,适合设置 SSE 连接的超时。
    • context.WithDeadline(parentCtx, deadline):创建一个在指定截止时间后自动取消的上下文。
  • 在 SSE 中的应用:

    • 在 SSE 处理函数中,使用 ctx := r.Context() 获取 HTTP 请求的上下文。
    • 使用 select 语句监听 <-ctx.Done(),当上下文被取消时(例如客户端断开),执行清理逻辑。
    • 示例代码:
func sseHandler(w http.ResponseWriter, r *http.Request) {ctx := r.Context()for {select {case <-ctx.Done():return // 客户端断开,退出default:// 发送数据fmt.Fprintf(w, "data: message\n\n")w.(http.Flusher).Flush()time.Sleep(2 * time.Second)}}
} 

这种方式确保当客户端断开时,goroutine 可以及时退出,避免资源泄漏。

4.2 使用通道管理客户端连接

  • 通道的作用: 通道可以用来管理多个客户端的连接生命周期,例如添加新客户端、移除断开的客户端和广播消息。

  • 关键结构:

    • addClient:一个通道(如 chan *SSEClient),用于添加新客户端。
    • removeClient:一个通道(如 chan *SSEClient),用于移除断开的客户端。
    • 定义一个 SSEServer 结构体,包含:- clients:一个映射(如 map[*SSEClient]struct{}),存储所有活跃客户端。
    • 每个 SSEClient 包含一个消息通道(如 chan []byte),用于发送数据。
  • 在 SSE 中的应用:

    • 当新客户端连接时,创建一个 SSEClient,初始化其消息通道,并通过 addClient 通道通知服务器。
    • 当客户端断开时,通过 removeClient 通道通知服务器,服务器从 clients 中移除该客户端并关闭其通道。
    • 使用 sync.Mutex 保护 clients 映射的并发访问,确保线程安全。
  • 示例代码:

type SSEClient struct {ID     stringStream chan []byte
}type SSEServer struct {clients      map[*SSEClient]struct{}addClient    chan *SSEClientremoveClient chan *SSEClientmutex        sync.Mutex
}func (s *SSEServer) Run() {for {select {case client := <-s.addClient:s.mutex.Lock()s.clients[client] = struct{}{}s.mutex.Unlock()case client := <-s.removeClient:s.mutex.Lock()delete(s.clients, client)s.mutex.Unlock()close(client.Stream)}}
}

5. 客户端交互

客户端通过 EventSource API 连接到 SSE 端点。例如:

const eventSource = newEventSource("/random");
eventSource.onmessage = function(event){console.log(event.data);// 处理接收到的随机数
};

EventSource 会自动处理重连,适合需要持续更新的场景。

6.demo

package mainimport ("encoding/json""fmt""io""log""net/http""runtime/debug""time""github.com/spf13/cast"
)func main() {defer recovery()http.HandleFunc("/chat/send", Send)fmt.Println("服务器启动在 http://localhost:8080")log.Fatal(http.ListenAndServe(":8080", nil))
}func Send(w http.ResponseWriter, r *http.Request) {// 处理预检请求if r.Method == "OPTIONS" {w.WriteHeader(http.StatusOK)return}body, err := io.ReadAll(r.Body)if err != nil {http.Error(w, err.Error(), http.StatusInternalServerError)return}var params SendRequesterr = json.Unmarshal(body, &params)if err != nil {http.Error(w, err.Error(), http.StatusInternalServerError)return}demo := []string{"你好","你是谁","你是做什么的","你是怎么工作的","你是在哪座城市","你是什么星座","你是哪个国家的","你是哪个省的","你是哪个市的","你是哪个区的","你是哪个街道的","你是哪个社区的","你是哪个村的",}flusher, ok := w.(http.Flusher) // 获取流式输出器if !ok {http.Error(w, "Streaming unsupported", http.StatusInternalServerError)return}//设置headerw.Header().Set("Content-Type", "text/event-stream")w.Header().Set("Cache-Control", "no-cache")w.Header().Set("Connection", "keep-alive")// 流式输出for _, v := range demo {time.Sleep(1 * time.Second)lineData := fmt.Sprintf("data: %s\n\n", v)io.WriteString(w, lineData)flusher.Flush()}
}type SendRequest struct {Msg string `json:"msg"`
}func recovery() {if rec := recover(); rec != nil {log.Printf("Panic Panic occur")if err, ok := rec.(error); ok {log.Printf("PanicRecover Unhandled error: %v\n stack:%v", err.Error(), cast.ToString(debug.Stack()))} else {log.Printf("PanicRecover Panic: %v\n stack:%v", rec, cast.ToString(debug.Stack()))}}
}

在这里插入图片描述
执行一下命令运行:

go mod initgo mod tidygo run main.go

用postman请求localhost:8080/chat/send
在这里插入图片描述

7.Go转发大模型流式输出demo

	sendRequest.Model ="qwen-max"streamResp:=&proto.StreamResp{}qwenClient:= service.NewQwen(sendRequest)qwenClient.QwenStream(streamResp)defer streamResp.HttpResp.Body.Close()// 1. 复制下游服务的响应头for key,values:= range streamResp.HttpResp.Header {for _,value:= range values {w.Header().Add(key, value)}}// 2. 复制下游服务的状态码w.WriteHeader(streamResp.HttpResp.StatusCode)//流式输出// 确保 ResponseWriter 支持 Flusherflusher,ok:= w.(http.Flusher)if!ok {http.Error(w,"Streaming unsupported", http.StatusInternalServerError)return}// 处理流式响应scanner:= bufio.NewScanner(streamResp.HttpResp.Body)for scanner.Scan(){lineData:= scanner.Text()// 将响应数据逐步发送给客户端io.WriteString(w, lineData+"\n\n")flusher.Flush()// 刷新缓冲区}

在这里插入图片描述

在 Go 中实现 Content-Type: text/event-stream 流式输出需设置正确头部、格式化事件数据并保持连接开放。标准库和框架各有优势,开发者可根据需求选择。

  • 推荐参考以下资源深入学习:
    • 使用Go实现实时通信:基于Server-Sent Events (SSE)
    • Go 中的Server-Sent Events:一种高效的实时通信替代方案
    • Server-Sent Events (SSE) in Golang
    • Using server-sent events
http://www.lryc.cn/news/609652.html

相关文章:

  • 【银河麒麟服务器系统】自定义ISO镜像更新内核版本
  • Linux 文件与目录属性管理总结
  • Android 区块链 + CleanArchitecture + MVI 架构实践
  • IDA9.1使用技巧(安装、中文字符串显示、IDA MCP服务器详细部署和MCP API函数修改开发经验)
  • Android工程命令行打包并自动生成签名Apk
  • 服务器突然之间特别卡,什么原因?
  • ffmpeg下载windows教程
  • clickhouse 中文数据的正则匹配
  • 随笔之 ClickHouse 列式分析数据库安装注意事项及基准测试
  • 人大金仓数据库常见问题(持续更新)
  • 数据结构----排序
  • Android 15.0 启动app时设置密码锁(升级到framework层判断)
  • 《时间之隙:内存溢出》
  • 《基于电阻抗断层成像(EIT)的触觉传感器:物理模拟与机器学习的创新结合》论文解读
  • RocketMQ与Kafka 消费者组的‌重平衡操作消息顺序性对比
  • 实现建筑环境自动控制,楼宇自控技术提升舒适与安全
  • 【前端】三件套基础介绍
  • 规则方法关系抽取-笔记总结
  • Postman 四种请求体格式全解析:区别、用法及 Spring Boot 接收指南
  • 实习005 (web后端springboot)
  • 【后端】Java static 关键字详解
  • 从零开始搞定类与对象(中)
  • Matplotlib与PySide6兼容性问题及解决方案
  • open-webui pipelines报404, ‘Filter pipeline.exporter not found‘
  • 基于Express+Ejs实现带登录认证的多模块增删改查后台管理系统
  • C++ 浅谈Robin Hood Hash 算法
  • 3ds Max 渲染效率提升指南:从场景设计优化开始
  • 【0基础3ds Max】常用快捷键
  • 【Linux下Java应用自动重启守护教程】
  • 【大模型】3D因果卷积动图怎么画