当前位置: 首页 > news >正文

Celery+RabbitMQ+Redis

FastApi异步任务Demo应用构建:Celery+RabbitMQ+Redis

以下是基于 FastAPI 和 Celery 构建分布式系统的核心技术方案及实现要点,结合高性能、可扩展性和生产级实践:


⚙️ 一、技术栈核心优势

  1. FastAPI

    • 异步支持(ASGI):非阻塞 I/O 处理高并发请求。
    • 自动 API 文档:集成 Swagger UI,简化接口调试。
    • 数据验证:通过 Pydantic 模型确保请求/响应数据完整性。
  2. Celery

    • 分布式任务队列:将耗时任务(如支付处理、数据分析)卸载到后台 Worker。
    • 定时任务:通过 celery beat 实现周期任务(如订单超时取消)。
    • 多节点扩展:支持横向扩展 Worker 应对高负载。
  3. Redis

    • 消息代理(Broker):传递任务消息到 Celery Worker。
    • 结果存储(Backend):持久化任务状态和执行结果。

🏗️ 二、分布式架构设计

推送任务
分发任务
分发任务
存储结果
存储结果
查询状态
FastAPI
Redis Broker
Celery Worker 1
Celery Worker 2
Redis Backend
  • 请求流程

    1. 用户请求 FastAPI 接口(如创建订单)。
    2. FastAPI 调用 task.delay() 将任务发送至 Redis 队列。
    3. Celery Worker 监听队列并执行任务(如库存扣减)。
    4. 结果存入 Redis,FastAPI 通过任务 ID 查询状态。
  • 关键组件

    • Celery Worker:执行异步任务,支持多进程并发(--concurrency=4)。
    • Celery Beat:调度定时任务(如每 30 分钟检查未支付订单)。
    • Flower:监控任务队列和 Worker 状态(可视化界面)。

📂 三、项目结构示例

project/
├── app.py                # FastAPI 主应用
├── celery_app.py         # Celery 实例化配置
├── tasks.py              # Celery 任务定义
├── models.py             # Pydantic 数据模型
├── utils.py              # 辅助函数(如任务状态查询)
└── docker-compose.yml    # 容器化部署

🧩 四、关键代码实现

  1. Celery 初始化(celery_app.py)
from celery import Celery
celery = Celery("tasks", broker="redis://localhost:6379/0", backend="redis://localhost:6379/0")
celery.conf.update(timezone="Asia/Shanghai", task_serializer="json")
  1. FastAPI 集成 Celery(app.py)
from fastapi import FastAPI
from tasks import process_orderapp = FastAPI()@app.post("/order")
async def create_order(order_data: dict):task = process_order.delay(order_data)  # 异步触发任务return {"task_id": task.id, "status": "pending"}
  1. 任务状态查询接口(app.py)
from celery.result import AsyncResult@app.get("/task/{task_id}")
async def get_task_status(task_id: str):result = AsyncResult(task_id)return {"status": result.status, "result": result.result}
  1. Celery 任务定义(tasks.py)
@celery.task
def process_order(order_data: dict):# 模拟耗时操作(如支付校验)time.sleep(5)update_order_status(order_data["id"], "completed")return {"status": "success"}

🚀 五、分布式部署方案

Docker Compose 配置(docker-compose.yml)

services:fastapi:image: fastapi-appports: ["8000:8000"]redis:image: redis:latestcelery-worker:image: celery-appcommand: celery -A tasks worker --loglevel=info --concurrency=4celery-beat:image: celery-appcommand: celery -A tasks beat --loglevel=infoflower:image: celery-appcommand: celery -A tasks flower --port=5555ports: ["5555:5555"]

⚡️ 六、性能优化策略

  1. 任务重试机制
@celery.task(bind=True, max_retries=3)
def process_order(self, order_data):try:# 业务逻辑except Exception as e:self.retry(exc=e, countdown=30)  # 30秒后重试
  1. 动态任务路由
    • 为高优先级任务分配独立队列:
celery.conf.task_routes = {"critical_task": {"queue": "high_priority"}}
  1. 结果过期设置

    • 减少 Redis 存储压力:result_expires=3600(1小时自动清理)。
  2. 监控告警

    • 使用 Flower 实时监控任务失败率与队列积压。

💎 总结

FastAPI + Celery + Redis 的组合提供了从开发到部署的全栈分布式解决方案,尤其适用于订单处理、数据分析等异步场景。关键优势在于:

  • 解耦业务逻辑:API 响应与后台任务分离,避免阻塞。
  • 弹性扩展:通过增加 Celery Worker 应对流量高峰。
  • 生产就绪:定时任务、错误重试、监控等机制保障系统鲁棒性。
http://www.lryc.cn/news/620297.html

相关文章:

  • AR展厅在文化展示与传承领域的应用​
  • 嵌入式学习(day26)frambuffer帧缓冲
  • 嵌入式|VNC实现开发板远程Debian桌面
  • PG靶机 - Pelican
  • 飞凌OK3568开发板QT应用程序编译流程
  • 21. 抽象类和接口的区别
  • 【单板硬件】器件采购:BOM表
  • 大数据可视化设计 | 智能家居 UI 设计:从落地方法到案例拆解
  • 【从网络基础到实战】理解TCP/IP协议体系的核心要点(包含ARP协议等其他协议介绍)
  • 词向量转化
  • nginx知识点
  • C语言相关简单数据结构:顺序表
  • 使用 Simple Floating Menu 插件轻松实现浮动联系表单
  • Linux学习-UI技术
  • phpstudy搭建pikachu
  • 《探索C++ set与multiset容器:深入有序唯一性集合的实现与应用》
  • java中的各种引用
  • C++算法·递推递归
  • 从感知到执行:人形机器人低延迟视频传输与多模态同步方案解析
  • 飞算AI:企业智能化转型的新引擎——零代码重塑生产力
  • 音频重采样使用RandomOverSampler 还是 SMOTE
  • Python 基础语法(一)
  • Java研学-RabbitMQ(七)
  • 云计算-实战 OpenStack 私有云运维:服务部署、安全加固、性能优化、从服务部署到性能调优(含数据库、内核、组件优化)全流程
  • 《深入解析C++中的Map容器:键值对存储的终极指南》
  • FPGA+护理:跨学科发展的探索(四)
  • Java 大视界 -- 基于 Java 的大数据可视化在能源互联网全景展示与能源调度决策支持中的应用
  • Ubuntu24.04桌面版安装wps
  • 20250813比赛总结
  • Centos 用户管理