流式输出阻塞原因及解决办法
流式输出不懂可看这篇文章:流式输出:概念、技巧与常见问题
正常情况,如下代码所示:
async def event_generator():# 先输出数字1yield "data: 1\n\n"# 然后每隔2秒输出数字2,共输出10次for i in range(10):await asyncio.sleep(2) # 等待2秒yield "data: 2\n\n"return StreamingResponse(event_generator(),media_type="text/event-stream",)
一、网络缓存
在FastAPI中实现Server-Sent Events (SSE)流式响应时,经常遇到响应数据不是实时发送给客户端,而是累积到一定程度后一次性发送的问题。这主要由以下几个原因造成:
- 网络层缓冲机制:网络协议栈为了提高传输效率,会将小的数据包进行缓冲合并,这种现象称为Nagle算法或TCP延迟确认。
- 应用层缓冲:
Web服务器(如uvicorn)和ASGI应用本身可能对响应内容进行缓冲
Python的异步框架在处理异步生成器时,可能会将多个yield值缓存起来批量处理 - 事件循环调度:
在同一个异步函数中连续调用yield时,事件循环可能不会立即切换到发送数据的协程
需要显式地让出控制权,使事件循环有机会处理数据发送
解决方法:
-
事件循环控制权让出:
await asyncio.sleep(0)会让当前协程暂停执行,并将控制权交还给事件循环
事件循环会处理其他待处理的任务,包括将已生成的数据发送给客户端 -
打破缓冲机制:
通过在每次yield后添加await asyncio.sleep(0),强制中断当前协程的连续执行
这给底层传输机制一个机会来刷新缓冲区并发送数据
async def event_generator():# 先输出数字1yield "data: 1\n\n"# 如果是下面这种就会产生阻塞,需要使用 await asyncio.sleep(0)for event in coze.chat.stream(bot_id=bot_id,user_id=user_id,additional_messages=user_messages,):if event.event == ChatEventType.CONVERSATION_MESSAGE_DELTA:yield f"data: {event.message.content}\n\n"# 强制刷新缓冲区,确保数据立即发送await asyncio.sleep(0)return StreamingResponse(event_generator(),media_type="text/event-stream",)
二、Nginx 缓存
Nginx 作为反向代理时,流式输出出现问题是很常见的问题。Nginx 默认会缓冲后端的响应,这会导致流式传输无法正常工作,主要原因包括:
- Nginx默认启用缓冲,会等待后端响应完成后再转发给客户端
- HTTP/1.0的连接行为可能中断流式传输
- 超时设置过短导致连接被提前关闭
第一步:在返回方法处,添加 headers 参数,如下
return StreamingResponse(generate_stream(agent_id, BOT_ID, file_id, content, background_tasks),media_type="text/event-stream",headers={"Cache-Control": "no-cache","Connection": "keep-alive","X-Accel-Buffering": "no" # 禁用Nginx缓冲})
第二步:如果第一步未解决,则修改Nginx配置文件,添加关键配置下面几行
location /api/ {proxy_pass http://127.0.0.1:8051/;proxy_set_header Host $host;proxy_set_header X-Real-IP $remote_addr;proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;# 关键配置:禁用缓冲以支持流式传输proxy_buffering off;proxy_cache off;proxy_read_timeout 86400s;proxy_send_timeout 86400s;}