LangChain4j:基于 SSE 与 Flux 的 AI 流式对话实现方案
前言:本文聚焦 SSE 与 Flux 组合,解析二者如何协作:Flux 处理后端 AI 流式输出,SSE 将片段推向前端,实现 AI 内容逐段实时展示,为流式交互提供高效解决方案。
流式对话的核心需求
流式对话(如 ChatGPT、豆包等 AI 交互)的核心诉求是 实时、流畅、低延迟地展示动态生成的内容”,具体可拆解为以下需求点:
单向持续推送
流式对话的核心数据流向是 "服务器→客户端":用户发送一次提问后,AI 模型在后端逐步生成回答(通常是逐字 / 逐句生成),需要实时将中间结果推送给客户端,而客户端在此过程中无需向服务器发送额外数据,仅需等待结果即可。低延迟的片段化传输
AI 生成回答时,实时返回已生成题目给前端,从而给用户及时的回答,而不是让前端请求一直阻塞等待,最后一起返回。
而SSE与Flux结合恰好针对性地满足了上述需求:
Flux 作为 Java 响应式编程的核心类,负责后端流式数据处理 —— 以异步数据流形式封装 AI 逐字生成的内容,支持片段化数据的逐段发射,完美适配 AI “边生成边输出” 的逻辑,是后端处理流式数据的基础工具。
SSE 则专注于网络传输层面的单向推送 —— 基于 HTTP 长连接,客户端一次连接后,服务器可持续推送数据,与 AI 对话中 “后端→客户端” 的单向数据流完全匹配,高效实现实时传输。
两者配合,Flux 在后端接收并处理 AI 生成的片段,SSE 通过长连接将这些片段实时推送给前端,最终实现 “逐字显示” 的流畅体验,在实时性与资源消耗间达到平衡,是流式对话的理想选择。
Flux技术
Flux 是 Java 响应式编程库(如 Project Reactor)中的核心类,专门用于处理异步、流式的数据序列,可以理解为 “能动态产生多个数据元素的数据流容器”。它的设计非常适合需要逐段生成、实时处理数据的场景(如 AI 流式输出、实时日志等)
在 AI 流式场景中的作用
以 AI 对话为例:
当调用 AI 模型生成回答时,模型并非一次性输出完整内容,而是逐字 / 逐句计算(类似人 “边想边说”)。这些实时生成的片段(如 “你”“好”)会被依次 “发射” 到 Flux 中,形成一个持续的数据流。后端可通过 Flux 的 map
操作将片段转换为 SSE 格式,再推送给前端,实现 “逐字显示” 的效果。
简单说,Flux 是后端 “接住” 并处理 AI 流式输出的 “传送带”,让数据能按生成顺序实时流动、加工,为后续的传输(如通过 SSE 推给前端)提供基础。
我们可以对 Flux 对象进行下列操作:
SSE 技术
基本概念
服务器发送事件(Server-Sent Events)是一种用于从服务器到客户端的 单向、实时 数据传输技术,基于 HTTP协议实现。
它有几个重要的特点:
- 单向通信:SSE 只支持服务器向客户端的单向通信,客户端不能向服务器发送数据。
- 文本格式:SSE 使用 纯文本格式 传输数据,使用 HTTP 响应的
text/event-stream
MIME 类型。 - 保持连接:SSE 通过保持一个持久的 HTTP 连接,实现服务器向客户端推送更新,而不需要客户端频繁轮询。
- 自动重连:如果连接中断,浏览器会自动尝试重新连接,确保数据流的连续性。
SSE 数据格式
SSE 数据流的格式非常简单,使用 event
指定事件名称,用于区分不同类型的消息。每个事件使用 data
字段,作为消息主体,事件以两个换行符结束。还可以使用 id
字段来标识事件,并且 retry
字段可以设置重新连接的时间间隔。
event: 事件名\n // 可选,用于区分不同类型的消息
data: 消息内容\n // 必选,消息主体(可多行,每行以 data: 开头)
id: 消息ID\n // 可选,用于客户端记录最后接收的消息ID(重连时可通过 Last-Event-ID 头传递)
retry: 重连时间(毫秒)\n // 可选,指定客户端重连间隔
\n // 空行表示一条消息结束
示例格式如下:
data: Third message\n
id: 3\n
\n
retry: 10000\n
data: Fourth message\n
\n
SSE 与 Flux 的结合实现AI输出流式对话
后端代码
@GetMapping(value = "/chat/gen/code", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> chatToGenCode(@RequestParam Long appId,@RequestParam String message,HttpServletRequest request) {// 参数校验ThrowUtils.throwIf(appId == null || appId <= 0, ErrorCode.PARAMS_ERROR, "应用ID无效");ThrowUtils.throwIf(StrUtil.isBlank(message), ErrorCode.PARAMS_ERROR, "用户消息不能为空");// 获取当前登录用户User loginUser = userService.getLoginUser(request);// 调用服务生成代码(流式)Flux<String> contentFlux = appService.chatToGenCode(appId, message, loginUser);// 转换为 ServerSentEvent 格式return contentFlux.map(chunk -> {// 将内容包装成JSON对象Map<String, String> wrapper = Map.of("d", chunk);String jsonData = JSONUtil.toJsonStr(wrapper);return ServerSentEvent.<String>builder().data(jsonData).build();});
}
后端用 Flux 处理 AI 流式输出,通过 SSE 协议推送
重难点解析:
1 @GetMapping(value = "/chat/gen/code", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
produces = MediaType.TEXT_EVENT_STREAM_VALUE
:声明接口返回的数据类型为 text/event-stream
(SSE 协议的标准媒体类型),告诉浏览器:这是一个流式响应,需要保持连接并持续接收数据,而非一次性响应。
2 public Flux<ServerSentEvent<String>> chatToGenCode(...){}
将 AI 生成的流式代码片段,包装成符合 SSE协议的格式,持续推送给前端
设计原因:
- 如果只返回
Flux<String>
:数据是原始字符串,不符合 SSE 协议格式,前端EventSource
无法解析,会认为是无效数据。 - 如果返回单个
ServerSentEvent
:只能推送一次数据,无法实现 “持续流式输出”(失去了流式的核心意义)。 - 只有
Flux<ServerSentEvent<String>>
能同时满足:“持续发射数据”(Flux 的能力) 和 “数据符合 SSE 协议”(ServerSentEvent 的作用),从而实现前端实时接收流式数据的需求。
3 Flux<String> contentFlux = appService.chatToGenCode(appId, message, loginUser);
通过 Flux
实现 "边生成边返回",而非等待 AI 生成完整代码后一次性返回
4 chunk -> {// 将内容包装成JSON对象Map<String, String> wrapper = Map.of("d", chunk);String jsonData = JSONUtil.toJsonStr(wrapper);
chunk
是 AI 生成的单个代码片段(如<div class="login">
)。- 用
Map.of("d", chunk)
将片段包装成{"d": "片段内容"}
的Map结构,再将片段转为 JSON 字符串(键d
可自定义,需与前端解析逻辑对应),方便前端统一解析。
5 return ServerSentEvent.<String>builder().data(jsonData).build();
使用ServerSentEvent
类,将 JSON 格式的代码片段包装为符合 SSE 协议的事件对象。
6 .concatWith(Mono.just(// 发送结束事件ServerSentEvent.<String>builder().event("done").data("").build()))
在原始流的所有数据发送完毕后,额外追加一个自定义事件名为 done
的SSE事件。
前端代码
// 开始生成isGenerating.value = truegenerationProgress.value = 0await generateCode(message, aiMessageIndex)
}// 生成代码 - 使用 EventSource 处理流式响应
const generateCode = async (userMessage: string, aiMessageIndex: number) => {let eventSource: EventSource | null = nulllet streamCompleted = falsetry {// 获取 axios 配置的 baseURLconst baseURL = request.defaults.baseURL || API_BASE_URL// 构建URL参数const params = new URLSearchParams({appId: appId.value || '',message: userMessage,stream: 'true',})const url = `${baseURL}/app/chat/gen/code?${params}`// 创建 EventSource 连接eventSource = new EventSource(url, {withCredentials: true,})let fullContent = ''// 处理接收到的消息eventSource.onmessage = function (event) {if (streamCompleted) returntry {// 解析JSON包装的数据const parsed = JSON.parse(event.data)const content = parsed.d// 拼接内容if (content !== undefined && content !== null) {fullContent += contentmessages.value[aiMessageIndex].content = fullContentmessages.value[aiMessageIndex].loading = falsescrollToBottom()// 更新进度generationProgress.value = Math.min(90, generationProgress.value + 5)}} catch (error) {console.error('解析消息失败:', error)handleError(error, aiMessageIndex)}}// 处理done事件eventSource.addEventListener('done', function () {if (streamCompleted) returnstreamCompleted = trueisGenerating.value = falsegenerationProgress.value = 100// 延迟更新预览,确保后端已完成处理setTimeout(async () => {await fetchAppInfo()updatePreview()}, 1000)})
前端代码通过
EventSource
与后端 SSE 接口建立连接,通过onmessage
实时接收并解析后端推送的代码片段(与后端d
键对应),通过done
事件监听生成完成信号
重点代码解析
1 eventSource = new EventSource(url, {withCredentials: true, // 携带 cookies(如登录凭证),与后端用户认证对应});
创建 EventSource 连接(与后端 SSE 接口建立长连接)
2 eventSource.onmessage = function (event) {if (streamCompleted) return;try {// 解析后端返回的 JSON 数据(与后端 Map.of("d", chunk) 对应)const parsed = JSON.parse(event.data);const content = parsed.d; // 键 "d" 与后端一致// 拼接代码片段,实时更新界面(实现流式显示效果)if (content !== undefined && content !== null) {fullContent += content;messages.value[aiMessageIndex].content = fullContent; // 更新 UIscrollToBottom(); // 滚动到最新内容generationProgress.value = Math.min(90, generationProgress.value + 5); // 更新进度}} catch (error) {console.error('解析消息失败:', error);handleError(error, aiMessageIndex);}};
处理后端推送的流式数据(与后端 map 操作生成的 SSE 事件对应)
3 eventSource.addEventListener('done', function () {if (streamCompleted) return;streamCompleted = true; // 标记流已完成isGenerating.value = false; // 关闭生成状态generationProgress.value = 100; // 进度设为 100%// 延迟更新预览(确保后端处理完成)setTimeout(async () => {await fetchAppInfo();updatePreview(); // 生成完成后执行后续操作(如预览代码)}, 1000);});
处理后端发送的结束事件(与后端 concatWith 中的 done 事件对应)
对应关系
角色 | 技术实现 | 作用 |
---|---|---|
后端数据处理 | Flux<String> | 接收 AI 生成的流式片段(如代码片段) |
后端传输协议 | Flux<ServerSentEvent<String>> | 将 Flux 片段包装为 SSE 格式 |
前端接收协议 | EventSource | 建立 SSE 连接,接收后端推送的流式数据 |
完整工作流程
1.后端用 Flux 处理流式数据
appService.chatToGenCode(...)
调用 AI 模型,返回Flux<String>
—— 这个 Flux 会 “逐段” 发射 AI 生成的代码(比如先返回<html>
, 再返回<body>
, 等等)。- 后端通过
map
操作,将每个代码片段chunk
包装成 SSE 协议要求的格式(ServerSentEvent
对象,包含data
字段),确保符合 SSE 数据规范。
2.后端通过 SSE 协议推送数据
- 接口标注
produces = MediaType.TEXT_EVENT_STREAM_VALUE
,告诉浏览器:这是一个 SSE 流,需要保持连接并接收持续推送。 - 每个
ServerSentEvent
会被转换为 SSE 格式的文本(如data: {"d": "<html>"}
),通过长连接推送给前端。
3.前端用 EventSource 接收并处理
- 前端创建
EventSource
连接到后端接口,建立 SSE 长连接。 - 每收到一个 SSE 消息(
eventSource.onmessage
),就解析出代码片段content
,并实时更新到页面(messages.value[aiMessageIndex].content = fullContent
),实现 “逐段显示” 的流式效果。 - 额外处理
done
事件(生成完成)和business-error
事件(业务错误),完善交互逻辑。
核心配合点
- Flux 负责 “内部流式处理”:在后端接收 AI 生成的片段,通过响应式编程实现高效的异步处理。
- SSE 负责 “外部流式传输”:后端将 Flux 中的片段包装为 SSE 格式,通过 HTTP 长连接推给前端;前端用
EventSource
接收,完成从 “后端数据” 到 “用户界面” 的实时展示。 - 两者结合,既利用了 Flux 对后端流式数据的处理能力,又通过 SSE 协议实现了前端的实时接收,最终达成 “AI 生成内容逐段显示” 的效果。
大功告成!