Celery在Django中的应用
Celery在Django中的应用
- 一、项目配置
- 二、异步任务
- 2.1 普通用法
- 2.1.1 通过delay
- 2.1.2 通过apply_async
- 2.2 高级用法
- 2.2.1 任务回调(Callback)
- 2.2.2 任务链(Chaining)
- 2.2.3 任务组(Group)
- 2.2.4 任务和弦(Chord)
- 三、定时任务
- 四、启动celery beat
- 五、监控管理
- 5.1 celery inspect
- 5.2 celery control
- 5.3 celery event
- 5.4 celery multi
- 5.5 celery purge
- 5.6 celery flower
一、项目配置
1.1 确认celery及django版本相对应,本文使用django3.2、celery5.5
1.2 创建一个名为CeleryStudy的django项目,以及一个名为test1_app,目录结构如下:
1.3 配置celery的setting参数(大部分不需要全局配置,可以针对tasks单独配置)
CELERY_BROKER_URL = 'redis://127.0.0.1:6379/0'
CELERY_RESULT_BACKEND = 'django-db' # django-db(使用 Django 数据库存储结果)
CELERY_ACCEPT_CONTENT = ['json'] # 指定 Celery 接受的任务序列化格式(避免反序列化安全问题)。
CELERY_TASK_SERIALIZER = 'json' # 指定任务的序列化方式
CELERY_RESULT_SERIALIZER = 'json' # 指定结果的序列化方式
CELERY_TIMEZONE = TIME_ZONE # 设置 Celery 的时区(影响定时任务的调度时间)
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler' # 指定定时任务调度器的后端
# 默认内存调度:celery.beat:PersistentScheduler(需配合 beat_schedule_filename)
# Redis 调度:celery.beat:RedisScheduler(需安装 celery-redis-scheduler)
CELERYD_CONCURRENCY = 4 # Worker 并发数(默认 CPU 核心数)
CELERY_BEAT_SCHEDULE = { # 一般在celery.py文件配置'every-10-seconds': {'task': 'myapp.tasks.debug','schedule': 10.0,},
}
CELERY_BEAT_MAX_LOOP_INTERVAL = 300 # 秒, Beat 调度器的最大循环间隔(默认 5 分钟)
CELERY_TASK_TIME_LIMIT = 300 # 硬超时 5 分钟(任务被强制终止)
CELERY_TASK_SOFT_TIME_LIMIT = 240 # 软超时 4 分钟(触发 `SoftTimeLimitExceeded`)
CELERY_TASK_DEFAULT_RETRY_DELAY = 60 # 任务重试间隔 1 分钟
CELERY_WORKER_LOG_FORMAT = '%(asctime)s [%(levelname)s] %(message)s' # 自定义 Worker 日志格式
...... # 等等等等等等, 还有一大堆配置
1.4 celery全局配置
# celery.pyimport os
from celery import Celeryos.environ.setdefault('DJANGO_SETTINGS_MODULE', 'CeleryStudy.settings') # 其作用是为 Django 提供配置文件的定位信息,确保框架能正确加载项目的各项设置app = Celery('CeleryStudy') # celery实例,一般命名为项目名称
app.config_from_object('django.conf:settings', namespace='CELERY') # celery实例从setting中CELERY开头的配置获取app.autodiscover_tasks() # 自动发现并注册项目中定义的tasks,会发现 @shared_task 和 @app.task
1.5 修改项目init文件,通过给外部导入
# __init__.pyfrom .celery import app as celery_app__all__ = ("celery_app",)
1.6 在app中新建tasks写入任务逻辑
# tasks.py
"""
@app.task 是“专属任务”,绑定到具体应用,适合简单场景。
@shared_task 是“共享任务”,解耦于应用,适合复杂架构。
"""写法一:
from celery import Celery
app = Celery('proj')@app.task # 绑定到当前 `app` 实例
def add(x, y):return x + y写法二:
import time
from celery import shared_task@shared_task
def test_add(x, y):time.sleep(2)return x + y@shared_task
def pre_task_test(x):# 定时任务return x
二、异步任务
2.1 普通用法
2.1.1 通过delay
# views.pyfrom django.http import HttpResponse
from .tasks import test_add# Create your views here.def test_celery(request):result = test_add.delay(1, 5)return HttpResponse(result.task_id + ' : ' + result.status)
2.1.2 通过apply_async
# countdown: 延迟执行(秒)。
# eta: 指定具体执行时间(datetime)。
# queue: 指定任务队列。
# expires: 任务过期时间。
# retry: 是否启用重试from datetime import datetime, timedelta# 延迟 10 秒执行
test_add.apply_async(args=(1, 5), countdown=10)
# 指定具体执行时间
test_add.apply_async(args=(1, 5), eta=datetime.now() + timedelta(minutes=1))
# 指定队列和过期时间
test_add.apply_async(args=(1, 5), queue='priority', expires=3600)
2.2 高级用法
通过 signature 对象调用,预生成任务签名(task.s()),用于创建一个可序列化的任务调用对象。它允许你预定义任务及其参数,而无需立即执行,从而支持更灵活的任务组合(如链式调用、组调用等),签名对象是可序列化的,可以存储到数据库或通过网络传递。
sig = test_add.s(1, 5) # 创建签名对象
sig.apply_async() # 异步执行
sig.delay() # 等价于 apply_async
2.2.1 任务回调(Callback)
在任务成功后触发另一个任务(通过 link 参数)
test_add.apply_async(args=(1, 5), link=send_notification.s("Task completed!"))
2.2.2 任务链(Chaining)
通过 | 符号或 chain() 将多个任务串联,前一个任务的结果作为后一个任务的输入
from celery import chain# 方法1:使用 | 符号
result = (task1.s(1, 2) | task2.s() | task3.s())()
# 方法2:使用 chain()
result = chain(task1.s(1, 2), task2.s(), task3.s())()
2.2.3 任务组(Group)
并行执行多个任务,等待所有任务完成
from celery import groupresult = group(task1.s(i) for i in range(10))() # 并发执行 10 个 task1
2.2.4 任务和弦(Chord)
先并行执行一组任务(group),全部完成后执行一个汇总任务
from celery import chordresult = chord((task1.s(i) for i in range(10)), task2.s())() # 10 个 task1 完成后执行 task2
三、定时任务
在celery文件中添加定时任务路由表
# celery.pyapp.conf.beat_schedule = {'task-name': { # 任务名称(自定义)'task': 'myapp.tasks.my_task', # 任务函数路径(需可导入)'schedule': 30, # 执行时间规则(固定间隔)# 或 'schedule': crontab(minute='*/5'), # Cron 表达式'args': (16, 16), # 传递给任务的参数(可选)'options': {'queue': 'priority'}, # 其他选项(如指定队列)},# 可定义多个任务
}
通过安装pip install django-celery-beat
可以实现在admin后台动态修改定时任务配置
INSTALLED_APPS = [...,'django_celery_beat',
]# 替换 Celery 的调度器
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
配置完记得迁移数据库 python manage.py migrate
四、启动celery beat
worker = 干活的(执行任务)。
beat = 发任务的(定时生成任务)。
协作关系:beat 是“计划部门”,worker是“执行部门”,两者通过 Broker(消息队列)解耦。
生产建议:分开启动,Worker 可横向扩展,Beat 保持单例
# 启动 Worker(处理任务)
celery -A CeleryStudy worker -l infocelery -A CeleryStudy worker -l info -P eventlet # windows环境下命令
"""
prefork 是 Celery 在 Linux 上的默认并发模型,它使用多进程(Multiprocessing)处理任务,
适合 CPU 密集型场景。但 Windows 系统不支持 fork() 系统调用,因此无法使用 prefork 池。
在 Windows 上尝试使用 prefork 会直接报错,导致 Worker 无法启动。eventlet 是一个基于协程(Coroutine)的并发库,通过绿色线程(Green Thread)实现高并发,
适合 I/O 密集型任务(如 Celery 的异步任务场景)。
在 Windows 上,eventlet 是少数可用的高性能并发池之一。
它通过非阻塞 I/O 和协程调度,避免了线程切换的开销,同时绕过了 GIL 的限制,
能显著提升任务处理效率
"""# 启动 Beat(调度任务)
celery -A CeleryStudy beat -l info# 合并启动
celery -A CeleryStudy worker --beat -l info
五、监控管理
5.1 celery inspect
作用:检查 Worker 状态、任务信息等(无需停止服务)。
celery -A proj inspect active # 查看正在执行的任务
celery -A proj inspect registered # 查看已注册的任务列表
celery -A proj inspect scheduled # 查看待执行的定时任务(需 Beat 运行)
celery -A proj inspect reserved # 查看 Worker 已获取但未执行的任务
celery -A proj inspect stats # 查看 Worker 统计信息(如任务处理数)
5.2 celery control
作用:动态控制 Worker 行为(如关闭、重启、调整并发数)。
celery -A proj control shutdown # 优雅关闭所有 Worker
celery -A proj control add_consumer Q1 # 动态添加监听队列 Q1
celery -A proj control cancel_consumer Q1 # 动态移除监听队列 Q1
celery -A proj control pool_grow 10 # 增加 Worker 并发数到 10
celery -A proj control pool_shrink 5 # 减少 Worker 并发数到 5
5.3 celery event
作用:监控 Celery 事件(如任务开始、成功、失败),可用于自定义仪表盘。
celery -A proj events # 启动事件监控(输出到终端)
celery -A proj events -d dump # 以 JSON 格式输出事件
celery -A proj events -f events.log # 将事件记录到文件
5.4 celery multi
作用:同时启动多个 Worker 或 Beat 实例(适用于分布式部署)。
celery multi start w1 w2 -A proj -l info -Q high,low # 启动两个 Worker,分别监听不同队列
celery multi stop w1 w2 # 停止指定 Worker
5.5 celery purge
作用:清空消息队列中的所有任务
celery -A proj purge -Q celery # 清空默认队列
celery -A proj purge -Q high,low # 清空多个队列
5.6 celery flower
作用:启动基于 Web 的监控仪表盘(需单独安装 flower 包)。
pip install flower
celery -A CeleryStudy flower --port=5555 # 访问 http://localhost:5555