当前位置: 首页 > news >正文

Flask——基于python完整实现客户端和服务器后端流式请求及响应

文章目录

    • 本地客户端
    • Flask服务器后端
    • 客户端/服务器端流式接收[打字机]效果

看了很多相关博客,但是都没有本地客户端和服务器后端的完整代码示例,有的也只说了如何流式获取后端结果,基本没有讲两端如何同时实现流式输入输出,特此整理总结,给大家交流学习和使用!

本地客户端

  • requests.post得到流式回复的重要参数:
    • stream:需要设置为True;
    • response.iter_content:使用该函数接收返回的流式数据。
import requests
import time
import jsondef generate_stream_data():# 假设这是要发送的文本列表is_end = Falselines = ["Hello", "world", "this", "is", "a", "stream", "of", "text"]for line in lines:print(line)if lines.index(line) == len(lines) - 1:is_end = Trueyield json.dumps({'line': line, 'is_end': is_end}) + '\n'time.sleep(0.5)# 模拟数据处理时间def get_stream_response(response):# 流式接收responserec_data_list = []temp_data = ''for chunk in response.iter_content(chunk_size=1):temp_data += chunk.decode('utf-8')if temp_data.endswith('\n'):temp_json = json.loads(temp_data)rec_data_list.append(temp_json)print(temp_data)temp_data = ''if temp_json['is_end']:breakprint(rec_data_list)print("----------------------------")print(temp_data)return rec_data_listdef stream_upload(url):# 流式接收responseresponse = requests.post(url, data=generate_stream_data(), stream=True)final_response = get_stream_response(response)return final_responseurl = 'http://127.0.0.1:5000/stream'
response = stream_upload(url)

Flask服务器后端

  • flask.request流式获取数据::
    • 使用request.stream.read读取数据,而不是get_data()等一次性函数。
from flask import Flask, Response, request
import time
import json
import requestsapp = Flask(__name__)def process_stream_data(stream_data):# 假设这是要发送的数据print("开始生成新的数据流")is_end = Falseprint(stream_data)for idx, line in enumerate(stream_data):if idx == len(stream_data)-1:is_end = Trueprint(line)yield json.dumps(line)+"\n"time.sleep(0.5)# 模拟数据处理时间def get_stream_request(chunk_size=1):req_data_list = []temp_data = ''while True:chunk = request.stream.read(chunk_size)temp_data += chunk.decode('utf-8')if temp_data.endswith('\n'):temp_json = json.loads(temp_data)req_data_list.append(temp_json)print(temp_data)temp_data = ''if temp_json['is_end']:return req_data_list@app.route('/stream', methods=['POST'])
def stream_text():data = get_stream_request()print("----------------------------")return Response(process_stream_data(data))if __name__ == "__main__":app.run(host='0.0.0.0', port=5000, debug=True)

客户端/服务器端流式接收[打字机]效果

请添加图片描述

http://www.lryc.cn/news/304360.html

相关文章:

  • crmeb多门店商城系统二次开发 增加车辆车牌搜索功能、车辆公里数
  • 深度好文|关于人类智能与自主系统
  • 防火墙内容安全笔记
  • 应用于温度报警器中的高精度温度传感芯片
  • 微信小程序swiper 视频中间大,两边小,轮播滑到中间视频自动播放组件教程
  • ARM服务器上部署zookeeper集群
  • 利用Ubuntu22.04启动U盘对电脑磁盘进行格式化
  • Nginx基础入门
  • 分布式和微服务
  • 【无标题】学习Markdown
  • 由于 vscode 版本更新为 1.86.1引起的相关问题。
  • 四、矩阵的分类
  • Windows环境下查看磁盘层级占用空间的解决方案
  • 超级实用的python代码片段汇总和详细解析(16个)
  • npm/nodejs安装、切换源
  • 【Kotlin】流程控制
  • Devc++ Easyx 实现 瓦片地图编辑数据导入游戏
  • 去年面试的运维开发面试题二
  • 【Unity编辑器扩展】Unity编辑器主题颜色设置工具
  • 精美的WordPress外贸独立站模板
  • 说一下 JVM 运行时数据区 ?
  • 外泌体相关基因肝癌临床模型预测——2-3分纯生信文章复现——02.数据格式整理(1)
  • Python 内存管理和优化之循环引用
  • 「Kafka」监控、集成篇
  • Linux之用户和用户组用户账号系统文件
  • ESP8266 (5),驱动屏幕
  • ChatGPT-01 用ChatGPT指令,自学任何领域的系统知识
  • android studio模拟器不能打开
  • 设计模式学习笔记 - 面向对象 - 5.接口和抽象类的区别
  • PolarDN MISC做题笔记