当前位置: 首页 > news >正文

【Python笔记-FastAPI】后台任务+WebSocket监控进度

目录

一、代码示例

二、执行说明

(一) 调用任务执行接口

(二) 监控任务进度


实现功能:

  1. 注册后台任务(如:邮件发送、文件处理等异步场景,不影响接口返回)
  2. 监控后台任务执行进度(进度条功能)
  3. 支持根据任务ID查询对应任务进度

一、代码示例

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
import random
import asynciofrom typing import List, Dict
from fastapi import FastAPI, BackgroundTasks, WebSocketapp = FastAPI()# 用于存储连接的 WebSocket 实例
connected_websockets: Dict[int, List[WebSocket]] = {}@app.websocket("/ws/{task_id}/")
async def websocket_endpoint(websocket: WebSocket, task_id: int):"""WebSocket路由,用于接收任务进度"""await websocket.accept()connected_websockets.setdefault(task_id, []).append(websocket)try:while True:await websocket.receive_text()except:connected_websockets[task_id].remove(websocket)@app.post("/task/{task_id}/")
async def start_task(background_tasks: BackgroundTasks, task_id: int):"""注册后台任务"""background_tasks.add_task(process_task, task_id=task_id)return {"task_id": task_id}async def process_task(task_id):"""处理任务的后台任务"""progress = 0while progress < 100:await asyncio.sleep(1)progress += random.randint(1, 10)progress = min(progress, 100)for ws in connected_websockets[task_id]:await ws.send_json({"task_id": task_id, "progress": progress})await asyncio.sleep(1)# 启动应用
if __name__ == "__main__":import uvicornuvicorn.run(app, host="0.0.0.0", port=8000)

二、执行说明

(一) 调用任务执行接口

  1. 启动服务后,访问:http://127.0.0.1:8000/docs
  2. POST请求:http://127.0.0.1:8000/task/1/,指定任务ID为1

(二) 监控任务进度

  1. 安装websocket请求工具:npm install -g wscat
  2. 终端输入wscat -c ws://127.0.0.1:8000/ws/1/,监控任务ID为1的执行进度

http://www.lryc.cn/news/311286.html

相关文章:

  • 力扣hot100:15.三数之和(双指针/哈希表)
  • VMware虚拟机使用Windows共享的文件夹
  • 利用Python自动化日常任务
  • Android的多线程和异步处理
  • MySQL-----视图
  • LeetCode-02
  • 瑞_Redis_Redis的Java客户端
  • Cmake的使用
  • linux系统ELK组件介绍
  • 回归预测 | Matlab实现BiTCN基于双向时间卷积网络的数据回归预测
  • Tailscale中继服务derper使用docker-compose部署
  • Spring Cloud 实战系列之 Zuul 微服务网关搭建及配置
  • 【数据结构】队列
  • 学习JAVA的第十三天(基础)
  • C++--机器人的运动范围
  • 深度学习API——keras初学
  • Web APIs知识点讲解(阶段二)
  • 多平台拼音输入法软件的开发
  • Flutter学习7 - Dart 泛型
  • Git 基本操作 ⼯作区、暂存区、版本库
  • 利用Vue3的新API(customRef)实现防抖效果
  • 【Linux】在 Ubuntu 系统下使用 Screen 运行 Python 脚本
  • jxls——自定义命令设置动态行高
  • 前端面试练习24.3.2-3.3
  • 优先级队列(Java )
  • 大宋咨询如何进行汽车门店6S标准现场检查
  • 仿牛客网项目---点赞模块的实现
  • 【AI视野·今日CV 计算机视觉论文速览 第300期】Fri, 1 Mar 2024
  • 【单片机学习的准备】
  • 力扣hot100:438.找到字符串中所有字母异位词