当前位置: 首页 > news >正文

【最后203篇系列】007 使用APS搭建本地定时任务

说明

最大的好处是方便。

其实所有任务的源头,应该都是通过定时的方式,在每个时隙发起轮询。当然在任务的后续传递中,可以通过CallBack或者WebHook的方式,以事件的形态进行。这样可以避免长任务执行的过程中进行等待和轮询。

总结一下:源头是定时轮询,中间过程是事件传递。

本次使用APS搭建本地定时任务的目的是为了简化实验性质的定时任务,通过在git项目下进行编辑任务脚本和执行任务清单,而运行容器本身会周期性的自动拉取代码,然后按照任务清单执行。

执行过程采用多线程方式,任务的负载通常都不高。整体设计上,复杂和繁重的任务会包在微服务中,定时任务主要是向这些微服务发起触发动作。通常,微服务收到触发元信息后进行自动的任务/数据拉取处理,处理完毕后通过webhook将结果持久化,或进一步发起其他的触发动作。

另外,具有共性的任务将会被提取出来,之后会交给celery以分布&协程方式执行,这些任务包括:

  • 1 数据库IO。例如从队列里取数,存到数据库中。
  • 2 网络数据获取IO。爬取网页、或者通过接口,获取数据。
  • 3 接口化标准操作。按url, json input这样的标准web请求,这种灵活性很强。表面上是一个IO动作,但背后可能触发密集计算,但是又不需要celery集群承担。(可能是ray集群、dask集群、基于显卡计算的集群)

内容

1 读取任务列表

主要为了简单的读入任务(脚本),同时可以方便的进行注释

# 用于将代表任务列表的数据读入
# 去掉换行和空格
# 如果以# 号开头表示注释
def read_all_lines_clean(fpath):with open(fpath, 'r') as f:lines = f.readlines()lines1 = [x.replace('\n','').strip() for x in lines]lines2 = [x for x in lines1 if len(x) and not x.startswith('#')]return lines2

任务文件如下task_list.txt

task_01_probably_git_pull.py
task_02_del_event_null_recs.py
# task_03_sync_xs_backup.py
#task_04_rotate_data.py
# task_05_sync_milvus.py
#task_06_rotate_mysql_time.py

读入后

In [4]: a = read_all_lines_clean('task_list.txt')In [5]: a
Out[5]: ['task_01_probably_git_pull.py', 'task_02_del_event_null_recs.py']

这些就是之后要定时调度的任务

2 并行执行

为了使得每一次定时任务都可以执行,且保证效率,需要用一些简单的调度(容错问题均在脚本内解决)。调度器可以保证每30秒起来一次。

线程的并行执行:

def exe_tasks_threads(task_list_file = base_config.task_list_file, project_folder = base_config.project_folder):tasks = read_all_lines_clean(project_folder + task_list_file)dedup_tasks = remove_duplicates_preserve_order(tasks)pytask_list = [ {'some_path':base_config.project_folder+x} for x in dedup_tasks]thread_concurrent_run(os_system_python, keyword_args_list=pytask_list, max_workers =50)

每一次执行os_system_python

import subprocessdef os_system_python(some_path=None, timeout=30):try:result = subprocess.run(['python3', some_path], timeout=timeout)return resultexcept subprocess.TimeoutExpired:print(f"Task {some_path} timed out after {timeout} seconds.")return None'''
代码说明
subprocess.run:这是 subprocess 模块的高级 API,用于运行命令并等待其完成。它支持 timeout 参数,如果命令在指定时间内未完成,会抛出 TimeoutExpired 异常。timeout 参数:你设置了默认超时时间为 30 秒,这是一个合理的默认值。如果任务在 30 秒内未完成,subprocess.run 会抛出 TimeoutExpired 异常。异常处理:捕获 TimeoutExpired 异常后,打印超时信息并返回 None。这样可以避免程序因超时而崩溃,同时提供清晰的日志信息。
'''

3 自动更新

更新git项目,作为一个任务脚本被周期执行。由于代码更新并不是高频事件,所以一般概率上保证5分钟会更新一次代码。

(base) root@76a14afa199b:/workspace/local_aps_v2/base# python3 task_01_probably_git_pull.py
2000-01-01 08:00:00
2000-01-01 08:00:00
2025-01-29 19:47:19 - httpx - INFO - HTTP Request: POST http://192.168.0.4:24132/send_msg/ "HTTP/1.1 200 OK"
task_01_probably_git_pull running
2025-01-29 19:47:19 - httpx - INFO - HTTP Request: POST http://192.168.0.4:24132/send_msg/ "HTTP/1.1 200 OK"
Git pull executed successfully for branch 'master':
Already up to date.2025-01-29 19:47:19 - httpx - INFO - HTTP Request: POST http://192.168.0.4:24132/send_msg/ "HTTP/1.1 200 OK"
(base) root@76a14afa199b:/workspace/local_aps_v2/base#

4 定时调度

调度器在每分钟的0/30秒执行,我把30秒定为一拍(pace),一分钟定位一时隙(slot)。绝大部分任务都应该在30秒内完成。

# 执行本地脚本
from datetime import datetime
import os
from apscheduler.schedulers.blocking import BlockingSchedulerfrom base_config import base_config
from Basefuncs import * 
def exe_tasks_threads(task_list_file = base_config.task_list_file, project_folder = base_config.project_folder):tasks = read_all_lines_clean(project_folder + task_list_file)dedup_tasks = remove_duplicates_preserve_order(tasks)pytask_list = [ {'some_path':base_config.project_folder+x} for x in dedup_tasks]thread_concurrent_run(os_system_python, keyword_args_list=pytask_list, max_workers =50)# 后台启动命令 nohup python3 /root/prj27_timetask/cron_task/test_001.py >/dev/null 2>&1 &if __name__ == '__main__':# 创建调度器sche1 = BlockingScheduler()# 添加任务,使用 cron 表达式每分钟的第 0 秒和第 30 秒执行sche1.add_job(exe_tasks_threads,'cron',second='0,30',  # 每分钟的第 0 秒和第 30 秒kwargs={},coalesce=True,max_instances=1)print('[S] Starting scheduler with cron (0s and 30s of every minute)...')try:sche1.start()  # 启动调度器except (KeyboardInterrupt, SystemExit):print('[S] Scheduler stopped.')

5 Docker运行

为了保证执行的稳定性,使用docker执行

docker run -d --name=local_aps_v2 \--restart=always \-v /etc/localtime:/etc/localtime -v /etc/timezone:/etc/timezone -v /etc/hostname:/etc/hostname -e "LANG=C.UTF-8" \-w /workspace/local_aps_v2/base \YOURIMAGE  \sh -c "git pull && python3 aps.py"

只有环境改变时才需要修改镜像重发布,大部分时候只要调试和修改代码,然后推送就可以了。

http://www.lryc.cn/news/528665.html

相关文章:

  • go gin配置air
  • Java定时任务实现方案(五)——时间轮
  • 【事务管理】
  • Highcharts 柱形图:深入解析与最佳实践
  • js笔记(黑马程序员)
  • Mac m1,m2,m3芯片使用nvm安装node14报错
  • LeetCode:63. 不同路径 II
  • 安装zsh并美化
  • 读量子霸权18读后总结与感想兼导读
  • 统计学中的样本概率论中的样本
  • HTML 符号详解
  • 蓝桥杯练习日常|c/c++竞赛常用库函数(下)
  • Python vLLM 实战应用指南
  • .NET MAUI 入门学习指南
  • JavaScript系列(49)--游戏引擎实现详解
  • AI如何帮助解决生活中的琐碎难题?
  • K8s运维管理平台 - KubeSphere 3.x 和4.x 使用分析:功能较强,UI美观
  • 芯片AI深度实战:基础篇之langchain
  • WordPress使用(1)
  • 单机伪分布Hadoop详细配置
  • 【高内聚】设计模式是如何让软件更好做到高内聚的?
  • 10.2 目录(文件夹)操作
  • LiteFlow Spring boot使用方式
  • OSCP:Windows 服务提权详解
  • 星火大模型接入及文本生成HTTP流式、非流式接口(JAVA)
  • 21.Word:小赵-毕业论文排版❗【39】
  • Python中的函数(上)
  • Windows11 安装poetry
  • 浅谈Linux 权限、压缩、进程与服务
  • 006 LocalStorage和SessionStorage