当前位置: 首页 > news >正文

celery/schedules.py源码精读

BaseSchedule类

基础调度类,它定义了一些调度任务的基本属性和方法。以下是该类的主要部分的解释:

  • __init__(self, nowfun: Callable | None = None, app: Celery | None = None):初始化方法,接受两个可选参数,nowfun表示返回当前时间的函数,app表示Celery应用程序的实例。
  • now(self) -> datetime:返回当前时间的方法。它使用了nowfun属性或者Celery应用程序的now方法。
  • remaining_estimate(self, last_run_at: datetime) -> timedelta:估计下一次运行的时间间隔。这是一个抽象方法,需要在子类中实现。
  • is_due(self, last_run_at: datetime) -> tuple[bool, datetime]:判断任务是否到期的方法。这也是一个抽象方法,需要在子类中实现。
  • maybe_make_aware(self, dt: datetime, naive_as_utc: bool = True) -> datetime:将时间转换为可能的时区感知时间。它使用了Celery应用程序的时区信息。
  • app属性:获取或设置与调度相关联的Celery应用程序实例。
  • tz属性:获取或设置调度任务的时区信息。
  • utc_enabled属性:获取或设置UTC是否启用的信息。
  • to_local(self, dt: datetime) -> datetime:将时间转换为本地时间。如果UTC启用,则将时间转换为本地时间;否则,使用回退方式将时间转换为本地时间。
  • __eq__(self, other: Any) -> bool:判断两个调度任务是否相等的方法。比较的依据是nowfun属性。

这个类提供了一些基本的调度任务功能,并定义了一些抽象方法,具体的调度任务类需要继承这个基类并实现这些抽象方法。

schedule类

用于表示Celery中的周期性任务调度

  1. 构造函数 (__init__ 方法):

    • 通过构造函数接受一些参数,包括 run_every(任务的运行间隔时间)、relative(一个布尔值,表示是否将运行时间舍入到间隔的分辨率)、nowfun(返回当前日期和时间的函数)、app(Celery应用实例)。
    • 使用 super().__init__(nowfun=nowfun, app=app) 调用父类(BaseSchedule)的构造函数。
  2. remaining_estimate 方法:

    • 计算距离下一次运行任务的剩余时间。
    • 使用 remaining 函数,传递上次运行时间、运行间隔、当前时间以及 relative 标志。
    • relative =True,当前时间是9.36,则会舍入到最近的30分的倍数10:00第一次执行,然后10:30,11:30…
    • relative =False则不做舍入,当前时间是9.36,则9.36第一次执行,然后10:06,11:36…
  3. is_due 方法:

    • 根据上次运行时间确定任务是否应该运行。
    • 返回一个包含布尔值(is_due,表示任务是否应该运行)和下一次检查的时间(以秒为单位)的元组。
  4. __repr__ 方法:

    • 返回对象的字符串表示,指示任务的频率以人类可读的格式显示。
  5. __eq__ 方法:

    • 实现相等性比较,允许对此类的实例进行比较。
  6. __reduce__ 方法:

    • 用于序列化。返回一个元组,可用于重新创建对象。
  7. seconds 属性:

    • 返回运行间隔的总秒数。
  8. human_seconds 属性:

    • 返回表示运行间隔秒数的人类可读字符串。

这个类看起来设计得很好,用于定义Celery框架中任务的周期性调度。它包括用于估算剩余时间、检查任务是否应该运行以及提供可读的调度表示的方法。

crontab_parser类

用于解析类似于Crontab表达式的字符串。Crontab表达式通常用于指定计划任务(定时任务)运行的时间规则。下面是对该类的主要解释:

  • 构造函数 (__init__):

    • 初始化 crontab_parser 对象。可以通过构造函数指定最大值和最小值,默认为 max_=60min_=0
    • 构造函数还设置了一系列正则表达式和相应的处理函数,这些正则表达式用于匹配不同的 Crontab 表达式模式。
  • parse 方法:

    • 接受一个 Crontab 表达式字符串,返回一个包含时间单位的集合。例如,parse('*/15') 返回 [0, 15, 30, 45]
  • 私有方法 (_parse_part, _expand_range, _range_steps, _star_steps, _expand_star, _expand_number):

    • 这些方法用于解析和展开不同类型的 Crontab 表达式部分。
    • _parse_part 方法遍历已解析的 Crontab 表达式部分,使用正则表达式匹配相应的模式,并调用相应的处理函数。
    • _expand_range 方法用于展开范围,例如 1-5 展开为 [1, 2, 3, 4, 5]
    • _range_steps 方法处理带步长的范围,例如 1-5/2 展开为 [1, 3, 5]
    • _star_steps 方法处理带步长的星号,例如 */2 展开为 [0, 2, 4, ..., max_]
    • _expand_star 方法展开星号,例如 * 展开为 [min_, min_+1, ..., max_]
    • _expand_number 方法用于展开数字,同时检查数字的有效性。

这个类的目的是解析 Crontab 表达式中的不同部分,将其展开为对应的时间单位集合。通过这样的解析,可以获取计划任务运行的时间规则。

crontab类

Celery 中 Crontab 调度的实现,用于定义基于类似于 cron 的时间表的任务执行规则,主要负责解析 Crontab 表达式,计算下一次任务运行的时间,以及判断任务是否应该运行。Crontab 表达式中的星号和数字表示通配符,可以非常灵活地定义任务的运行时间。

class crontab(BaseSchedule):# ...(文档字符串中有详细的描述)def __init__(self, minute: str = '*', hour: str = '*', day_of_week: str = '*',day_of_month: str = '*', month_of_year: str = '*', **kwargs: Any) -> None:# 初始化 Crontab 实例,接受分钟、小时、星期几、每月的第几天和每年的第几月等参数# 参数可以是数字、字符串(Crontab 表达式)或可迭代对象(例如列表、集合)# 使用 crontab_parser 来解析字符串表达式# _expand_cronspec 方法用于展开 cronspec,确保值在合理范围内def _expand_cronspec(cronspec: int | str | Iterable,max_: int, min_: int = 0) -> set[Any]:# 展开 cron 规范,将字符串表达式转换为一个集合,表示 Crontab 触发的所有时间单位值def _delta_to_next(self, last_run_at: datetime, next_hour: int,next_minute: int) -> ffwd:# 找到下一个 delta(时间间隔),用于计算下一次调度的时间# 主要用于在限制任务执行的 day_of_month 和/或 month_of_year cronspec 时def __repr__(self) -> str:# 返回 Crontab 的字符串表示形式def remaining_delta(self, last_run_at: datetime, tz: tzinfo | None = None,ffwd: type = ffwd) -> tuple[datetime, Any, datetime]:# 计算距离下一次执行的时间间隔# 用于 day_of_month 和/或 month_of_year cronspec 的任务调度def remaining_estimate(self, last_run_at: datetime, ffwd: type = ffwd) -> timedelta:# 估算下一次运行时间,返回 timedeltadef is_due(self, last_run_at: datetime) -> tuple[bool, datetime]:# 返回一个元组,表示任务是否应该运行和下一次运行的时间# 考虑了 beat_cron_starting_deadline 配置,确保在 deadline 内执行def __eq__(self, other: Any) -> bool:# 判断两个 Crontab 实例是否相等

maybe_schedule函数

用于将输入参数转换为 Celery 中的调度对象(BaseSchedule 类的实例),输入参数可以是整数(表示秒数)、浮点数(表示秒数)、timedelta 对象(表示时间间隔)或者已有的调度对象。如果输入是整数或浮点数,会先将其转换为 timedelta 对象,然后创建一个调度对象。如果输入已经是 timedelta 对象,则直接创建一个调度对象。如果输入是已有的调度对象,则设置其 Celery app 属性后返回。

def maybe_schedule(s: int | float | timedelta | BaseSchedule, relative: bool = False,app: Celery | None = None) -> float | timedelta | BaseSchedule:"""Return schedule from number, timedelta, or actual schedule."""if s is not None:if isinstance(s, (float, int)):# 如果输入是整数或浮点数,将其转换为 timedelta 对象s = timedelta(seconds=s)if isinstance(s, timedelta):# 如果输入是 timedelta 对象,创建一个 schedule 实例return schedule(s, relative, app=app)else:# 如果输入是 BaseSchedule 的实例,设置其 Celery app 属性s.app = appreturn s

solar类

用于处理太阳事件的 Celery 调度器的类。允许你创建一个周期性任务,该任务将根据特定的太阳事件进行调度。太阳事件的种类包括黎明、日出、日中、日落和黄昏等,你可以选择其中一个事件作为任务的触发条件。在初始化时,你需要指定事件类型(event)、观察者的纬度(lat)和经度(lon),以及其他一些可选参数。类中的方法包括 remaining_estimate(返回下一次运行的时间估计)和 is_due(返回任务是否应该运行及下一次运行的时间)。此外,还有一些辅助性的属性和方法,用于表示太阳事件的集合、地平线高度、计算方法等。

class solar(BaseSchedule):"""Solar event.A solar event can be used as the ``run_every`` value of aperiodic task entry to schedule based on certain solar events....Arguments:event (str): Solar event that triggers this task.See note for available values.lat (float): The latitude of the observer.lon (float): The longitude of the observer.nowfun (Callable): Function returning the current date and timeas a class:`~datetime.datetime`.app (Celery): Celery app instance."""_all_events = {...}  # 一组表示所有可能太阳事件的字符串集合_horizons = {...}  # 一组表示各种太阳事件时,地平线的高度_methods = {...}  # 一组表示用于计算太阳事件的方法_use_center_l = {...}  # 一组表示是否使用太阳中心计算的布尔值def __init__(self, event: str, lat: int | float, lon: int | float, **kwargs: Any) -> None:# 初始化太阳事件调度器...def remaining_estimate(self, last_run_at: datetime) -> timedelta:"""Return estimate of next time to run.Returns:~datetime.timedelta: when the periodic task shouldrun next, or if it shouldn't run today (e.g., the sun doesnot rise today), returns the time when the next checkshould take place."""# 返回下一次运行的时间估计...def is_due(self, last_run_at: datetime) -> tuple[bool, datetime]:"""Return tuple of ``(is_due, next_time_to_run)``.Note:next time to run is in seconds.See Also::meth:`celery.schedules.schedule.is_due` for more information."""# 返回是否应该运行及下一次运行的时间...def __eq__(self, other: Any) -> bool:if isinstance(other, solar):return (other.event == self.event andother.lat == self.lat andother.lon == self.lon)return NotImplemented
http://www.lryc.cn/news/261643.html

相关文章:

  • 单片机上位机(串口通讯C#)
  • 初识Flask
  • JeecgBoot jmreport/queryFieldBySql RCE漏洞复现
  • 机器学习---模型评估
  • 【机器学习】应用KNN实现鸢尾花种类预测
  • ACL和NAT
  • MX6ULL学习笔记(十二)Linux 自带的 LED 灯
  • Qt容器QToolBox工具箱
  • 华为实训课笔记
  • 基于java 的经济开发区管理系统设计与实现(源码+调试)
  • 外包干了3个月,技术退步明显。。。
  • 详细教程 - 从零开发 Vue 鸿蒙harmonyOS应用 第一节
  • R语言对医学中的自然语言(NLP)进行机器学习处理(1)
  • 什么是CI/CD?如何在PHP项目中实施CI/CD?
  • 玩转Docker(四):容器指令、生命周期、资源限制、容器化支持、常用命令
  • 回归预测 | MATLAB实现CHOA-BiLSTM黑猩猩优化算法优化双向长短期记忆网络回归预测 (多指标,多图)
  • Qt/C++视频监控安卓版/多通道显示视频画面/录像存储/视频播放安卓版/ffmpeg安卓
  • 【docker】容器使用(Nginx 示例)
  • 【QT】时间日期与定时器
  • 蓝桥杯专题-真题版含答案-【古代赌局】【古堡算式】【微生物增殖】【密码发生器】
  • 和鲸科技携手深圳数据交易所,“数据+数据开发者生态”赋能人工智能产业发展
  • 在MFC(Microsoft Foundation Classes)中 CreateThread函数
  • Ubuntu 常用命令之 ls 命令用法介绍
  • 【解决】Windows 11检测提示电脑不支持 TPM 2.0(注意从DTPM改为PTT)
  • ChatGPT 也宕机了?如何预防 DDOS 攻击的发生
  • wireshark下载安装
  • 如何退回chrome旧版ui界面?关闭Chrome浏览器新 UI 界面
  • 指针进阶篇
  • C语言之单链表理解与应用
  • SpringBoot对PDF进行模板内容填充、电子签名合并