当前位置: 首页 > news >正文

LangChain4j:基于 SSE 与 Flux 的 AI 流式对话实现方案

前言:本文聚焦 SSE Flux 组合,解析二者如何协作:Flux 处理后端 AI 流式输出SSE 将片段推向前端,实现 AI 内容逐段实时展示,为流式交互提供高效解决方案。

流式对话的核心需求

流式对话(如 ChatGPT、豆包等 AI 交互)的核心诉求是 实时、流畅、低延迟地展示动态生成的内容”,具体可拆解为以下需求点:

  1. 单向持续推送
    流式对话的核心数据流向是 "服务器→客户端":用户发送一次提问后,AI 模型在后端逐步生成回答(通常是逐字 / 逐句生成),需要实时将中间结果推送给客户端,而客户端在此过程中无需向服务器发送额外数据,仅需等待结果即可。

  2. 低延迟的片段化传输
    AI 生成回答时,实时返回已生成题目给前端,从而给用户及时的回答,而不是让前端请求一直阻塞等待,最后一起返回。

SSE与Flux结合恰好针对性地满足了上述需求:

Flux 作为 Java 响应式编程的核心类,负责后端流式数据处理 —— 以异步数据流形式封装 AI 逐字生成的内容,支持片段化数据的逐段发射,完美适配 AI “边生成边输出” 的逻辑,是后端处理流式数据的基础工具。

SSE 则专注于网络传输层面的单向推送 —— 基于 HTTP 长连接,客户端一次连接后,服务器可持续推送数据,与 AI 对话中 “后端→客户端” 的单向数据流完全匹配,高效实现实时传输。

两者配合,Flux 在后端接收并处理 AI 生成的片段,SSE 通过长连接将这些片段实时推送给前端,最终实现 “逐字显示” 的流畅体验,在实时性与资源消耗间达到平衡,是流式对话的理想选择。

Flux技术

Flux 是 Java 响应式编程库(如 Project Reactor)中的核心类,专门用于处理异步、流式的数据序列,可以理解为 “能动态产生多个数据元素的数据流容器”。它的设计非常适合需要逐段生成、实时处理数据的场景(如 AI 流式输出、实时日志等)

在 AI 流式场景中的作用

以 AI 对话为例:

当调用 AI 模型生成回答时,模型并非一次性输出完整内容,而是逐字 / 逐句计算(类似人 “边想边说”)。这些实时生成的片段(如 “你”“好”)会被依次 “发射” 到 Flux 中,形成一个持续的数据流。后端可通过 Flux 的 map 操作将片段转换为 SSE 格式,再推送给前端,实现 “逐字显示” 的效果。

简单说,Flux 是后端 “接住” 并处理 AI 流式输出的 “传送带”,让数据能按生成顺序实时流动、加工,为后续的传输(如通过 SSE 推给前端)提供基础。

我们可以对 Flux 对象进行下列操作:

SSE 技术

基本概念

服务器发送事件(Server-Sent Events)是一种用于从服务器到客户端的 单向、实时 数据传输技术,基于 HTTP协议实现。

它有几个重要的特点:

  1. 单向通信:SSE 只支持服务器向客户端的单向通信,客户端不能向服务器发送数据。
  2. 文本格式:SSE 使用 纯文本格式 传输数据,使用 HTTP 响应的 text/event-stream MIME 类型。
  3. 保持连接:SSE 通过保持一个持久的 HTTP 连接,实现服务器向客户端推送更新,而不需要客户端频繁轮询。
  4. 自动重连:如果连接中断,浏览器会自动尝试重新连接,确保数据流的连续性。

SSE 数据格式

SSE 数据流的格式非常简单,使用 event 指定事件名称,用于区分不同类型的消息。每个事件使用 data 字段,作为消息主体,事件以两个换行符结束。还可以使用 id 字段来标识事件,并且 retry 字段可以设置重新连接的时间间隔。

event: 事件名\n    // 可选,用于区分不同类型的消息
data: 消息内容\n    // 必选,消息主体(可多行,每行以 data: 开头)
id: 消息ID\n       // 可选,用于客户端记录最后接收的消息ID(重连时可通过 Last-Event-ID 头传递)
retry: 重连时间(毫秒)\n  // 可选,指定客户端重连间隔
\n  // 空行表示一条消息结束

示例格式如下:

data: Third message\n
id: 3\n
\n
retry: 10000\n
data: Fourth message\n
\n

SSE 与 Flux 的结合实现AI输出流式对话

后端代码

@GetMapping(value = "/chat/gen/code", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> chatToGenCode(@RequestParam Long appId,@RequestParam String message,HttpServletRequest request) {// 参数校验ThrowUtils.throwIf(appId == null || appId <= 0, ErrorCode.PARAMS_ERROR, "应用ID无效");ThrowUtils.throwIf(StrUtil.isBlank(message), ErrorCode.PARAMS_ERROR, "用户消息不能为空");// 获取当前登录用户User loginUser = userService.getLoginUser(request);// 调用服务生成代码(流式)Flux<String> contentFlux = appService.chatToGenCode(appId, message, loginUser);// 转换为 ServerSentEvent 格式return contentFlux.map(chunk -> {// 将内容包装成JSON对象Map<String, String> wrapper = Map.of("d", chunk);String jsonData = JSONUtil.toJsonStr(wrapper);return ServerSentEvent.<String>builder().data(jsonData).build();});
}

后端用 Flux 处理 AI 流式输出,通过 SSE 协议推送

重难点解析:

1  @GetMapping(value = "/chat/gen/code", produces = MediaType.TEXT_EVENT_STREAM_VALUE)

produces = MediaType.TEXT_EVENT_STREAM_VALUE声明接口返回的数据类型为 text/event-stream(SSE 协议的标准媒体类型),告诉浏览器:这是一个流式响应,需要保持连接并持续接收数据,而非一次性响应。

2 public Flux<ServerSentEvent<String>> chatToGenCode(...){}

将 AI 生成的流式代码片段,包装成符合 SSE协议的格式,持续推送给前端

设计原因:

  • 如果只返回 Flux<String>:数据是原始字符串,不符合 SSE 协议格式,前端 EventSource 无法解析,会认为是无效数据。
  • 如果返回单个 ServerSentEvent:只能推送一次数据,无法实现 “持续流式输出”(失去了流式的核心意义)。
  • 只有 Flux<ServerSentEvent<String>> 能同时满足:“持续发射数据”(Flux 的能力) 和 “数据符合 SSE 协议”(ServerSentEvent 的作用),从而实现前端实时接收流式数据的需求。

3 Flux<String> contentFlux = appService.chatToGenCode(appId, message, loginUser);

通过 Flux 实现 "边生成边返回",而非等待 AI 生成完整代码后一次性返回

4    chunk -> {// 将内容包装成JSON对象Map<String, String> wrapper = Map.of("d", chunk);String jsonData = JSONUtil.toJsonStr(wrapper);
  • chunk 是 AI 生成的单个代码片段(如 <div class="login">)。
  • 用 Map.of("d", chunk) 将片段包装成 {"d": "片段内容"} 的Map结构,再将片段转为 JSON 字符串(键 d 可自定义,需与前端解析逻辑对应),方便前端统一解析。

5    return ServerSentEvent.<String>builder().data(jsonData).build();

使用ServerSentEvent 类,将 JSON 格式的代码片段包装为符合 SSE 协议的事件对象。

6    .concatWith(Mono.just(// 发送结束事件ServerSentEvent.<String>builder().event("done").data("").build()))

在原始流的所有数据发送完毕后,额外追加一个自定义事件名为 done的SSE事件

前端代码

  // 开始生成isGenerating.value = truegenerationProgress.value = 0await generateCode(message, aiMessageIndex)
}// 生成代码 - 使用 EventSource 处理流式响应
const generateCode = async (userMessage: string, aiMessageIndex: number) => {let eventSource: EventSource | null = nulllet streamCompleted = falsetry {// 获取 axios 配置的 baseURLconst baseURL = request.defaults.baseURL || API_BASE_URL// 构建URL参数const params = new URLSearchParams({appId: appId.value || '',message: userMessage,stream: 'true',})const url = `${baseURL}/app/chat/gen/code?${params}`// 创建 EventSource 连接eventSource = new EventSource(url, {withCredentials: true,})let fullContent = ''// 处理接收到的消息eventSource.onmessage = function (event) {if (streamCompleted) returntry {// 解析JSON包装的数据const parsed = JSON.parse(event.data)const content = parsed.d// 拼接内容if (content !== undefined && content !== null) {fullContent += contentmessages.value[aiMessageIndex].content = fullContentmessages.value[aiMessageIndex].loading = falsescrollToBottom()// 更新进度generationProgress.value = Math.min(90, generationProgress.value + 5)}} catch (error) {console.error('解析消息失败:', error)handleError(error, aiMessageIndex)}}// 处理done事件eventSource.addEventListener('done', function () {if (streamCompleted) returnstreamCompleted = trueisGenerating.value = falsegenerationProgress.value = 100// 延迟更新预览,确保后端已完成处理setTimeout(async () => {await fetchAppInfo()updatePreview()}, 1000)})

前端代码通过 EventSource 与后端 SSE 接口建立连接,通过 onmessage 实时接收并解析后端推送的代码片段(与后端 d 键对应),通过 done 事件监听生成完成信号

重点代码解析

1    eventSource = new EventSource(url, {withCredentials: true, // 携带 cookies(如登录凭证),与后端用户认证对应});

创建 EventSource 连接(与后端 SSE 接口建立长连接)

2    eventSource.onmessage = function (event) {if (streamCompleted) return;try {// 解析后端返回的 JSON 数据(与后端 Map.of("d", chunk) 对应)const parsed = JSON.parse(event.data);const content = parsed.d; // 键 "d" 与后端一致// 拼接代码片段,实时更新界面(实现流式显示效果)if (content !== undefined && content !== null) {fullContent += content;messages.value[aiMessageIndex].content = fullContent; // 更新 UIscrollToBottom(); // 滚动到最新内容generationProgress.value = Math.min(90, generationProgress.value + 5); // 更新进度}} catch (error) {console.error('解析消息失败:', error);handleError(error, aiMessageIndex);}};

处理后端推送的流式数据(与后端 map 操作生成的 SSE 事件对应)

3    eventSource.addEventListener('done', function () {if (streamCompleted) return;streamCompleted = true; // 标记流已完成isGenerating.value = false; // 关闭生成状态generationProgress.value = 100; // 进度设为 100%// 延迟更新预览(确保后端处理完成)setTimeout(async () => {await fetchAppInfo();updatePreview(); // 生成完成后执行后续操作(如预览代码)}, 1000);});

处理后端发送的结束事件(与后端 concatWith 中的 done 事件对应)

对应关系

角色技术实现作用
后端数据处理Flux<String>接收 AI 生成的流式片段(如代码片段)
后端传输协议Flux<ServerSentEvent<String>>将 Flux 片段包装为 SSE 格式
前端接收协议EventSource建立 SSE 连接,接收后端推送的流式数据

完整工作流程

1.后端用 Flux 处理流式数据

  • appService.chatToGenCode(...) 调用 AI 模型,返回 Flux<String>—— 这个 Flux 会 “逐段” 发射 AI 生成的代码(比如先返回 <html>, 再返回 <body>, 等等)。
  • 后端通过 map 操作,将每个代码片段 chunk 包装成 SSE 协议要求的格式(ServerSentEvent 对象,包含 data 字段),确保符合 SSE 数据规范。

2.后端通过 SSE 协议推送数据

  • 接口标注 produces = MediaType.TEXT_EVENT_STREAM_VALUE,告诉浏览器:这是一个 SSE 流,需要保持连接并接收持续推送。
  • 每个 ServerSentEvent 会被转换为 SSE 格式的文本(如 data: {"d": "<html>"}),通过长连接推送给前端。

3.前端用 EventSource 接收并处理

  • 前端创建 EventSource 连接到后端接口,建立 SSE 长连接。
  • 每收到一个 SSE 消息(eventSource.onmessage),就解析出代码片段 content,并实时更新到页面(messages.value[aiMessageIndex].content = fullContent),实现 “逐段显示” 的流式效果。
  • 额外处理 done 事件(生成完成)和 business-error 事件(业务错误),完善交互逻辑。

核心配合点

  • Flux 负责 “内部流式处理”:在后端接收 AI 生成的片段,通过响应式编程实现高效的异步处理。
  • SSE 负责 “外部流式传输”:后端将 Flux 中的片段包装为 SSE 格式,通过 HTTP 长连接推给前端;前端用 EventSource 接收,完成从 “后端数据” 到 “用户界面” 的实时展示。
  • 两者结合,既利用了 Flux 对后端流式数据的处理能力,又通过 SSE 协议实现了前端的实时接收,最终达成 “AI 生成内容逐段显示” 的效果。

大功告成!

http://www.lryc.cn/news/623018.html

相关文章:

  • lesson40:PyMySQL完全指南:从基础到高级的Python MySQL交互
  • 数据结构:层序遍历 (Level-order Traversal)
  • 图论Day4学习心得
  • Kafka 面试题及详细答案100道(11-22)-- 核心机制1
  • 代码随想录Day52:图论(孤岛的总面积、沉没孤岛、水流问题、建造最大岛屿)
  • Cmake学习笔记
  • 代码随想录算法训练营四十三天|图论part01
  • 数字化与人工智能的崛起及其社会影响研究报告
  • 基于uni-app+vue3实现的微信小程序地图范围限制与单点标记功能实现指南
  • Altium Designer 22使用笔记(7)---网表导入,叠层设置
  • 【电路笔记 通信】AXI4-Lite协议 论文阅读 简化的高级可扩展接口(AdvancedeXtensibleInterface4Lite)
  • 【计算机网络架构】混合型架构简介
  • 车载诊断架构 --- 怎么解决对已量产ECU增加具体DTC的快照信息?
  • 超越Transformer:大模型架构创新的深度探索
  • 【自动化运维神器Ansible】Ansible逻辑运算符详解:构建复杂条件判断的核心工具
  • 11、软件需求工程
  • 【系统分析师】软件需求工程——第11章学习笔记(下)
  • 架构调整决策
  • 软件需求管理过程详解
  • M-LAG双活网关
  • linux I2C核心、总线与设备驱动
  • 特洛伊木马和后门程序的定义、联系、区别与应用场景
  • UE5多人MOBA+GAS 45、制作冲刺技能
  • 深入详解PCB布局布线技巧-去耦电容的摆放位置
  • 【AndroidStudio修改中文设置】
  • 玉米及淀粉深加工产业展|2026中国(济南)国际玉米及淀粉深加工产业展览会
  • UE5多人MOBA+GAS 46、制作龙卷风技能
  • 机器学习——PCA算法
  • 心路历程-学Linux的开端
  • 【php反序列化介绍与常见触发方法】