当前位置: 首页 > news >正文

流式接口,断点续传解决方案及实现

背景:用户刷新页面或者切换tab页后断链的续流问题

目前有两种方案:

方案一:后端:Mongo + 实时存 (数据库压力大,每个流都要进行一次入库操作)    前端轮询

方案二:后端:Redis + Mongo (推荐, 流的过程中使用Redis,流数据结束后再去入一次库)

              前端:调新接口

方案一实现:

public SseEmitter sendinfo (QuestionnaireDTO dto) {       Flux<String> flux = algorithmUtils.code_bswj(dto, webClient);flux.doOnError(e -> {try {completeEmitter(emitter, e, isCompleted);} catch (IOException ex) {throw new RuntimeException(ex);}})// 处理客户端断开连接.doOnComplete(() -> {try {completeEmitter(emitter, null, isCompleted);} catch (IOException e) {throw new RuntimeException(e);}}) // 传null表示正常完成.subscribe(data -> {try {sendDataToEmitter(emitter, data, isCompleted);} catch (IOException e) {throw new RuntimeException(e);}});// 订阅 Flux 并发送数据到 SseEmitterreturn emitter;}private void sendDataToEmitter(SseEmitter emitter, String data, AtomicBoolean isCompleted) throws IOException {if (!isCompleted.get()) {try {processChunk(data, questionnaire,isCompleted,  traceId);} catch (IOException | JSONException e) {if (e instanceof ClientAbortException) {this.emitter = new SseEmitterUTF8(1000000L);return;}completeEmitter(emitter, e, isCompleted,questionnaire); // 处理发送过程中的异常}}}private void processChunk( String chunk, Questionnaire questionnaire, AtomicBoolean isCompleted) throws IOException, JSONException {System.out.println("chunk handing before =========>:" + chunk);if (!StringUtils.hasText(chunk)) return;JSONObject jsonObject = new JSONObject(chunk);if (!jsonObject.getBoolean("is_success")) {emitter.send(jsonObject.getString("err_msg"));return;}String result = jsonObject.getString("results");JSONObject jsonObjectRes = new JSONObject(result);String type = jsonObjectRes.getString("type");//answerString value = jsonObjectRes.getString("value");JSONObject response = new JSONObject();JSONObject results = new JSONObject();if (type.equals("progress_indicator")) {JSONObject valueJson = new JSONObject(value);String text = valueJson.getString("text");if (text.equals("[DONE]")) {questionnaireRepository.save(questionnaire);completeEmitter(emitter, null, isCompleted, questionnaire);return;}results.put("type", "progress_indicator");results.put("data", text);if (isBase) {String progressBase = questionnaire.getProgressBase();progressBase = progressBase == null ? "" : progressBase;progressBase += text;questionnaire.setProgressBase(progressBase);} else {String progressCustom = questionnaire.getProgressCustom();progressCustom = progressCustom == null ? "" : progressCustom;progressCustom += text;questionnaire.setProgressCustom(progressCustom);}questionnaireRepository.save(questionnaire);} else if (type.equals("survey")) {JSONObject valueJson = new JSONObject(value);String customSurvey = valueJson.getString("custom_survey");questionnaire.setCustomQuestion(customSurvey);questionnaireRepository.save(questionnaire);results.put("type", "survey");results.put("data", customSurvey);}else if (type.equals("finished_thinking")) {JSONObject valueJson = new JSONObject(value);String text = valueJson.getString("text");results.put("type", "finished_thinking");results.put("data", text);if (isBase) {questionnaire.setStatus("SurveyGenerating");}else {questionnaire.setStatus("MapGenerating");}questionnaireRepository.save(questionnaire);}response.put("results", results);if (!isCompleted.get()){System.out.println("后端发送给前端时间:" + TimeUtil.getCurrentTime());emitter.send(response.toString());}}

通过实时插入数据库,前端感知到刷新或者切换tab后,根据状态轮询调用历史记录接口,因为此时后端与算法的流还没有断开,所以是在实时保存的。前端此时轮询历史接口可以伪造出流式输出的形式,使用户无感知。

方案二实现:(代码几乎同上,就是在处理流式的方法里做了改动)

private void processChunk( String chunk, Questionnaire questionnaire, AtomicBoolean isCompleted, String traceId) throws IOException, JSONException {System.out.println("chunk handing before =========>:" + chunk);if (!StringUtils.hasText(chunk)) return;JSONObject jsonObject = new JSONObject(chunk);if (!jsonObject.getBoolean("is_success")) {emitter.send(jsonObject.getString("err_msg"));return;}String result = jsonObject.getString("results");JSONObject jsonObjectRes = new JSONObject(result);String type = jsonObjectRes.getString("type");//answerString value = jsonObjectRes.getString("value");JSONObject response = new JSONObject();JSONObject results = new JSONObject();if (type.equals("progress_indicator")) {JSONObject valueJson = new JSONObject(value);String text = valueJson.getString("text");if (text.equals("[DONE]")) {questionnaireRepository.save(questionnaire);//设置redis中key为questionnaire.getId()的过期时间为5分钟,目前redis中是有这个key的
//                redisService.expire(questionnaire.getId(), 1);completeEmitter(emitter, null, isCompleted, questionnaire);return;}results.put("type", "progress_indicator");results.put("data", text);if (isBase) {String progressBase = questionnaire.getProgressBase();progressBase = progressBase == null ? "" : progressBase;progressBase += text;questionnaire.setProgressBase(progressBase);} else {String progressCustom = questionnaire.getProgressCustom();progressCustom = progressCustom == null ? "" : progressCustom;progressCustom += text;questionnaire.setProgressCustom(progressCustom);}} else if (type.equals("survey")) {JSONObject valueJson = new JSONObject(value);String customSurvey = valueJson.getString("custom_survey");questionnaire.setCustomQuestion(customSurvey);questionnaireRepository.save(questionnaire);accompanyLearningLog.uploadLogByTranceId("processChunk", "后端处理定制问卷", "INFO", JsonUtil.object2Json(customSurvey),traceId);results.put("type", "survey");results.put("data", customSurvey);} else if (type.equals("text")) {JSONObject valueJson = new JSONObject(value);String text = valueJson.getString("text");results.put("type", "text");results.put("data", text);questionnaire.setPlanCustom(text);accompanyLearningLog.uploadLogByTranceId("processChunk", "后端处理多链路地图", "INFO", JsonUtil.object2Json(text),traceId);questionnaireRepository.save(questionnaire);}else if (type.equals("finished_thinking")) {JSONObject valueJson = new JSONObject(value);String text = valueJson.getString("text");results.put("type", "finished_thinking");results.put("data", text);if (isBase) {questionnaire.setStatus("SurveyGenerating");}else {questionnaire.setStatus("MapGenerating");}questionnaireRepository.save(questionnaire);}response.put("results", results);//这里有了一个新的逻辑,把思维链保存的redis的队列中,然后如果前端断开连接,想要做断点续传,就从队列中取出,然后继续,队列的key是questionnaire.getId()// 把思维链保存到 Redis 队列中redisService.rightPush(questionnaire.getId(), response.toString());if (!isCompleted.get()){System.out.println("后端发送给前端时间:" + TimeUtil.getCurrentTime());emitter.send(response.toString());}}

前端刷新页面或者来回切换了tab后会调用我的新接口,也是一个流式输出的接口

    /*** 断点续传方法,用于从 Redis 中获取数据并通过 SseEmitter 发送给前端。* 如果数据库中有符合条件的数据则直接处理,否则从 Redis 中获取数据。*/@Overridepublic SseEmitter refreshStream(QuestionnaireDTO dto, String userId, String traceId) {// 创建一个 SseEmitter 对象,设置超时时间为 1000000 毫秒SseEmitter emitter = new SseEmitterUTF8(1000000L);// 先从数据库取数据,如果取到了就不查redis了/*Questionnaire questionnaire = questionnaireRepository.findByUserId(userId);if (questionnaire != null) {try {if(questionnaire.getStatus().equals("baseCompleted")||questionnaire.getStatus().equals("customCompleted")){// 若问卷状态为 baseCompleted 或 customCompleted,发送 [DONE] 并完成 SseEmitteremitter.send("[DONE]");emitter.complete();}return emitter;} catch (Exception e) {// 处理异常,将错误信息记录到日志并完成 SseEmitteremitter.completeWithError(e);accompanyLearningLog.uploadLogByTranceId("refreshStream", "从数据库获取数据发送失败", "ERROR", JsonUtil.object2Json(e), traceId);return emitter;}}*/// 获取 Redis 中数据列表的键String key = dto.getId();// 从redis中取key为dto.getId()的列表的长度Long listLength = redisService.size(key);// 若 Redis 列表长度不为空且大于 0if (listLength != null && listLength > 0) {// 记录这个长度final long[] initialLength = {listLength};// 从redis中取列表中的数据List<Object> redisDataList = redisService.range(key, 0, initialLength[0] - 1);// 用于拼接 type 为 progress_indicator 的 data 数据StringBuilder progressData = new StringBuilder();// 标记 SseEmitter 是否已经完成AtomicBoolean isEmitterCompleted = new AtomicBoolean(false);// 遍历 Redis 数据列表for (Object data : redisDataList) {try {// 将数据转换为 JSONObjectJSONObject jsonData = new JSONObject(data.toString());// 获取 results 字段JSONObject results = jsonData.getJSONObject("results");// 获取 type 字段String type = results.getString("type");if("generate_indicator".equals(type)){// 若 type 为 generate_indicator,直接发送数据emitter.send(data.toString());}else  if ("progress_indicator".equals(type)) {// 若 type 为 progress_indicator,拼接 data 数据progressData.append(results.getString("data"));} else {// 若 SseEmitter 未完成,发送非 progress_indicator 类型的数据if (!isEmitterCompleted.get()) {System.out.println("data:" + data.toString());emitter.send(data.toString());}}} catch (JSONException e) {}catch (IOException e) {// 处理发送数据时的 IO 异常,完成 SseEmitter 并记录日志emitter.completeWithError(e);}}// 以{"results":{"type":"progress_indicator","data":"所有的data"}}格式推给前端try {// 若 progressData 不为空if (progressData.length() > 0) {// 创建响应 JSON 对象JSONObject response = new JSONObject();JSONObject results = new JSONObject();results.put("type", "progress_indicator");results.put("data", progressData.toString());response.put("results", results);System.out.println("response:"+response.toString());// 若 SseEmitter 未完成,发送拼接后的 progress_indicator 数据if (!isEmitterCompleted.get()) {emitter.send(response.toString());}}// 从记录的长度开始,后面的就不需要拼接了,直接取到后推给前端就可以了// 使用数组包装 subscriptionfinal Disposable[] subscription = new Disposable[1];// 每秒检查一次 Redis 列表长度是否有变化subscription[0]  =  Flux.interval(Duration.ofSeconds(1)).subscribe(interval -> {// 获取当前 Redis 列表长度Long currentLength = redisService.size(key);if (currentLength != null && currentLength > initialLength[0]) {// 获取新增的数据List<Object> newDataList = redisService.range(key, initialLength[0], currentLength - 1);for (Object newData : newDataList) {try {if(newData.toString().equals("[DONE]")){// 若数据为 [DONE],发送数据,完成 SseEmitter,取消订阅并删除 Redis 键if (!isEmitterCompleted.get()) {emitter.send(newData.toString());emitter.complete();isEmitterCompleted.set(true);// 取消订阅if (subscription[0] != null) {subscription[0].dispose();}redisService.delete(key);}return;}System.out.println("newData:"+newData.toString());// 若 SseEmitter 未完成,发送新增数据if (!isEmitterCompleted.get()) {emitter.send(newData.toString());try {// 线程休眠 50 毫秒Thread.sleep(50);} catch (InterruptedException e) {// 处理线程休眠中断异常,完成 SseEmitter Thread.currentThread().interrupt();emitter.completeWithError(e);}}} catch (IOException e) {emitter.completeWithError(e);}}// 更新初始长度initialLength[0] = currentLength;}});} catch (Exception e) {// 处理异常,完成 SseEmitter 并记录日志emitter.completeWithError(e);}} else {try {// 若 Redis 列表长度为空或为 0,完成 SseEmitteremitter.complete();} catch (Exception e) {// 处理异常,完成 SseEmitter 并记录日志emitter.completeWithError(e);}}return emitter;}

http://www.lryc.cn/news/600120.html

相关文章:

  • QKV 为什么是三个矩阵?注意力为何要除以 √d?多头注意力到底有啥用?
  • 【PyTorch】图像多分类项目
  • jwt 在net9.0中做身份认证
  • qt框架,使用webEngine如何调试前端
  • 开发笔记 | 优化对话管理器脚本与对话语音的实现
  • 13.使用C连接mysql
  • Jenkins中出现pytest: error: unrecognized arguments: --alluredir=report错误解决办法
  • 栈----1.有效的括号
  • 机器学习 KNN 算法,鸢尾花案例
  • 从Taro的Dialog.open出发,学习远程控制组件之【事件驱动】
  • C++ 多线程同步机制详解:互斥锁、条件变量与原子操作
  • Python Multiprocessing 进程池完全教程:从理论到实战
  • 数据结构(3)单链表
  • [尚庭公寓]14-找房模块
  • Canal 1.1.7的安装
  • 习题5.6 “数学黑洞“
  • PHP插件开发中的一个错误:JSON直接输出导致网站首页异常
  • 纸板留声机:用ESP32和NFC打造会唱歌的复古装置
  • 手语式映射:Kinova Gen3 力控机械臂自适应控制的研究与应用
  • 秒收蜘蛛池解析机制的原理
  • PPIO上线阿里旗舰推理模型Qwen3-235B-A22B-Thinking-2507
  • ATR2652SGNSS全频段低噪声放大器
  • PostgreSQL对象权限管理
  • GPU 驱动安装升级测试
  • [NPUCTF2020]ReadlezPHP
  • CSS 盒子模型学习版的理解
  • C语言第 9 天学习笔记:数组(二维数组与字符数组)
  • ODFM(正交频分复用)系统中加入汉明码(Hamming Code)的主要目的是增强抗误码能力,通过**前向纠错(FEC)**机制提高传输可靠性
  • KNN算法:从原理到实战全解析
  • Kubernetes深度解析:企业级容器编排平台的核心实践