当前位置: 首页 > news >正文

Django定时任务框架django-apscheduler的使用

1.安装库

pip install django-apscheduler

2.添加 install_app    

django_apscheduler

3.在app下添加一个task.py文件,用来实现具体的定时任务

task.pydef my_scheduled_job():print("这个任务每3秒执行一次", time.time())

4.在app下创建一个management文件夹,里面包含一个空的__init__.py和一个文件夹commands,commands里面包含一个空的__init__.py和一个start_tasks.py,其中start_tasks.py是主要的启动脚本,结构如下:


from datetime import datetime
from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.date import DateTrigger
from apscheduler.triggers.interval import IntervalTrigger
from django.core.management.base import BaseCommand
from django_apscheduler.jobstores import DjangoJobStorefrom ...tasks import my_scheduled_job, my_scheduled_job1#
# Django manage.py命令:存储定时任务信息
#
class Command(BaseCommand):help = '启动定时任务.'def handle(self, *args, **options):# 调度器scheduler = BlockingScheduler()  # 研发阶段使用# scheduler = BackgroundScheduler() # 生产阶段使用# 任务存储scheduler.add_jobstore(DjangoJobStore(), 'default')# 配置线程池执行器,限制最大并发数为1,防止并发executor = ThreadPoolExecutor(max_workers=1)scheduler.executor = executor# 注册定义任务id_print_task = 'print_task__job'print('开始-增加任务({})'.format(id_print_task))scheduler.add_job(my_scheduled_job,id=id_print_task,name=id_print_task,max_instances=1,replace_existing=True,trigger=IntervalTrigger(seconds=15, start_date=datetime.now(), ),  # 从当前时间开始,每15秒钟调度一次)# 指定某个时间运行run_date = datetime(2024, 7, 18, 18, 33)scheduler.add_job(my_scheduled_job1,id='id_print_task1',name='id_print_task1',trigger=DateTrigger(run_date=run_date),  # 使用 DateTrigger 并设置 run_datereplace_existing=True  # 如果已存在同名任务,则替换它)# 指定每天某个时间运行scheduler.add_job(my_scheduled_job1,id='daily_task_at_18_36',name='Daily Task at 18:36',trigger=CronTrigger(hour=18, minute=38),  # 使用 CronTrigger 并设置小时和分钟replace_existing=True  # 如果已存在同名任务,则替换它)print('完成-增加任务({})'.format(id_print_task))# 启动定时任务try:scheduler.start()except KeyboardInterrupt:scheduler.shutdown()

5.修改settings.py配置,将时区改为中国,最主要是把USE两项注释掉,否则后面的定时任务的时间会晚八小时!!!!

settings.pyLANGUAGE_CODE = 'zh-hans'  # 中文语言代码TIME_ZONE = 'Asia/Shanghai'  # 中国上海时区# USE_I18N = True
#
# USE_TZ = True

6.添加完成后,做数据库迁移,会生成 表django_apscheduler_djangojob 和 表django_apscheduler_djangojobexecution

python manage.py migrate

7.启动项目,然后启动定时任务:

 python manage.py start_tasks

启动后,任务开始执行,数据库上述两张表会有数据进去,定时任务完成,其他的见后面拓展。

拓展:

django-apscheduler框架还提供了很多操作定时任务的函数。比如:

  • 删除任务
    scheduler.remove_job(job_name)
  • 暂停任务
    scheduler.pause_job(job_name)
  • 开启任务
    scheduler.resume_job(job_name)
  • 获取所有任务
    scheduler.get_jobs()
  • 修改任务
    scheduler.modify_job(job_name)

注:修改任务只能修改参数,如果要修改执行时间的话,有3种方法
第一就把任务删了重新创建,
第二直接操作数据库,
第三用到下面重设任务。

  • 重设任务
    scheduler.reschedule_job(job_name)

scheduler.reschedule_job(job_id="job1", trigger='interval', minutes=1)

http://www.lryc.cn/news/402389.html

相关文章:

  • 知识库文档处理,word转markdown
  • TF和TF-IDF区别和联系
  • 02线性表 - 链表
  • 高性能、安全、低碳绿色的趋势下,锐捷网络发布三擎云办公解决方案 3.0
  • python3 shutil排除特定或者模糊匹配文件或目录
  • Spire.PDF for .NET【文档操作】演示:如何在 C# 中切换 PDF 层的可见性
  • 新文件覆盖旧文件还能复原吗?八大excel文档修复软件免费
  • Android 10.0 Launcher3拖拽图标进入hotseat自适应布局功能实现一
  • 彻底解决idea的编解码问题
  • 仅两家!云原生向量数据库 PieCloudVector 全项通过信通院「可信数据库」评测
  • vue使用x6画流程图,简单使用
  • 低代码中间件学习体验分享:业务系统的创新引擎
  • 阿里云ACP云计算高级攻城狮通用知识
  • log4js node日志插件
  • 【MQTT(3)】开发一个客户端,QT-Android安卓手机版本
  • 大数据之数据抽取架构演变过程
  • [web]-反序列化-绕过__wakeup(转)
  • B树与B+树的区别
  • 机器人开源调度系统OpenTCS-6最新版本地源码运行
  • 云监控(华为) | 实训学习day3(10)
  • springMVC前后端请求参数绑定和传递
  • 【iOS】—— 消息传递和消息转发
  • 【Node.js】初识 Node.js
  • AWS backup服务和 RDS snapshot的关系
  • PDF转Word怎么快速转换?格式转换技巧分享
  • 浅谈:网络协议及网络连接
  • websocket-react使用
  • 【总结】nginx源码编译安装报错./configure: error: SSL modules require the OpenSSL library.
  • 昇思25天学习打卡营第15天|两个分类实验
  • 实践:Redis6.0配置文件解读