当前位置: 首页 > news >正文

pyhton django web集群基于linux定时任务

基于django management/commands目录下的脚本

from django.core.management import BaseCommand
import logging
import uuid
from pia.utils.cache import reset_redis_expire
from pia.utils.reids_key import TASK_KEYlogging = logging.getLogger('task')"""
定时任务: 定时任务 看门狗续时
"""class Command(BaseCommand):def add_arguments(self, parser):parser.add_argument('trace_id', type=str)def handle(self, *args, **options):trace_id = options["trace_id"]if not trace_id:trace_id = str(uuid.uuid4())self.trace_id = trace_idself.writeLog('watchdog continuation start')reset_redis_expire(TASK_KEY + "privacy_detect", 80)reset_redis_expire(TASK_KEY + "privacy_analysis", 80)reset_redis_expire(TASK_KEY + "privacy_notify", 80)reset_redis_expire(TASK_KEY + "tx_organization", 80)reset_redis_expire(TASK_KEY + "send_message", 80)reset_redis_expire(TASK_KEY + "todo_notify", 80)reset_redis_expire(TASK_KEY + "thirdparty_ex_notify", 80)reset_redis_expire(TASK_KEY + "cookies_scan", 80)reset_redis_expire(TASK_KEY + "xuanwu_app_scan", 80)self.writeLog('watchdog continuation end')def writeLog(self, msg: str):logging.info(f'[{self.trace_id}] {msg}')
import sys
import os
from pathlib import Path
from tasks import CrontabTask# 添加 Django 项目的根目录到 Python 路径
django_project_path = Path(__file__).resolve().parent.parent.parent
sys.path.append(str(django_project_path))
# 设置 DJANGO_SETTINGS_MODULE 环境变量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'tencheck.settings')
if __name__ == '__main__':"""总入口 脚本启动信息"""# crontab任务 redis keyredis_key = sys.argv[1]# crontab任务名称task_name = sys.argv[2]# 环境yaml文件env = sys.argv[3]env_name = sys.argv[4]config_name = sys.argv[5]rainbow_key = sys.argv[6]os.environ.setdefault('ENV_NAME', env_name)os.environ.setdefault('CONFIG_NAME', config_name)os.environ.setdefault('RAINBOW_KEY', rainbow_key)task = CrontabTask(env=env, redis_key=redis_key, task_name=task_name)task.start()
from pathlib import Path
import os
import sys
import logging
import time
import traceback
import uuid
import yaml
from django.core.cache import cacheTASK_KEY = "pia:crontab:"class BaseTask:def __init__(self, *args, **kwargs):self.env = kwargs.get('env')self.task_name = kwargs.get('task_name')self.redis_key = kwargs.get('redis_key')self.workspace = Path(__file__).resolve().parent.parent.parentself.config = self.load_config()self.pid = os.getpid()self.trace_id = str(uuid.uuid4())def load_config(self):""" 读取配置文件 """config = {}with open(os.path.join(self.workspace, f"task/conf/config-{self.env}.yaml"), 'r') as f:config = yaml.safe_load(f)return configdef writelog(self, text: str) -> None:""" 记录日志 """log_path = self.config['log_path']formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(name)s - %(message)s")file_handler = logging.FileHandler(log_path + "/crontab_task.log")  # 日志目录file_handler.setFormatter(formatter)logger = logging.getLogger()logger.addHandler(file_handler)logger.setLevel(logging.DEBUG)text = time.strftime(f'[PID:{self.pid}] ', time.localtime(time.time())) + textlogger.info(f'[{self.trace_id}] ' + text)  # 写入日志logger.removeHandler(file_handler)file_handler.close()def script_err_report(push_user: list, info: str) -> None:""" 脚本告警信息 """# writelog(info)passdef start_crontab(self) -> int:"""redis控制脚本执行"""try:redis_key = self.redis_key"""保证原子性 True if lock acquired, False otherwise"""task_status = cache.add(TASK_KEY + redis_key, True, 80)if task_status:self.writelog(f"task  set redis key {TASK_KEY + redis_key}")return 0else:self.writelog(f"task redis key {TASK_KEY + redis_key} exists")return 1except Exception:self.writelog(f"redis connect fail {traceback.format_exc()}")def end_crontab(self) -> None:""" 清除redis信息 """try:redis_key = self.redis_keycache.delete(TASK_KEY + redis_key)self.writelog(f"clean redis key  {TASK_KEY + redis_key}")except Exception:self.writelog(f"redis connect fail {traceback.format_exc()}")def start(self):start_time = time.time()execute = Falsetry:self.writelog(f"task {self.task_name} start")if self.start_crontab() != 0:error_info = f"task {self.task_name} not finish!"self.writelog(error_info)sys.exit(0)execute = Trueself.do_work()except Exception as e:result_err_info = traceback.format_exc()self.writelog(f"{self.task_name} ERROR: {result_err_info}")# 告警# self.script_err_report(SCRIPT_ERR_REPORT_USER, result_err_info)finally:end_time = time.time()self.writelog(f"{self.task_name} cost time {end_time - start_time}")if execute:self.end_crontab()def do_work(self):""" 脚本核心功能主流程 """# self.writelog("privacy analysis task start")# cmds = [f'cd {WORKSPACE_DIR} && {self.config["privacy_detect"]["python_path"]} manage.py privacy_analysis {self.trace_id}']# retcode = subprocess.call(cmds, shell=True)# self.writelog(f"privacy analysis task end retcode:{retcode}")
import subprocess
from base_task import BaseTaskclass CrontabTask(BaseTask):def do_work(self):config = self.configtask_name = self.task_nameworkspace = self.workspaceself.writelog(f"{task_name} task start")cmds = [f'cd {workspace} && {config["python_path"]} manage.py {task_name} {self.trace_id}']retcode = subprocess.call(cmds, shell=True)self.writelog(f"{task_name} task end retcode:{retcode}")

将任务添加到Linux服务器的 crontab里面

# argv[1]------redis_key ; argv[2]------task_name即 commands目录下 crontab_前缀的文件 ; argv[3]------配置文件 ; argv[4]------环境变量 ; argv[5]------组  ; argv[6]------通道*/1 * * * * cd /usr/local/oit/tencheck && ../env/bin/python3.11 task/script/start_task.py watchdog crontab_watchdog tx xx xx xx >/dev/null 2>&1 &

http://www.lryc.cn/news/487679.html

相关文章:

  • 探索 Python 字典的奥秘:Future 对象为何能成为字典的键?
  • 多品牌摄像机视频平台EasyCVR视频融合平台+应急布控球:打造城市安全监控新体系
  • Spark 中 RDD checkpoint 是通过启动两个独立的 Job 完成的。
  • 如何下载TikTok视频没有水印
  • 天童美语:提升孩子的自信心的方法
  • 【网络编程】字节序:大端序和小端序
  • 视频融合×室内定位×数字孪生
  • RK3568平台开发系列讲解(platform虚拟总线驱动篇)注册 platform 驱动
  • Jmeter进阶篇(26)杀掉Tomcat的几种方法
  • Solana 区块链的技术解析及未来展望 #dapp开发#公链搭建
  • SMO算法-核方法支持向量机
  • Java项目实战II基于微信小程序的科创微应用平台(开发文档+数据库+源码)
  • HTTP代理是什么,有什么用?
  • Postman之newman
  • 数据库查询表结构和数据量以及占用空间
  • android 性能分析工具(03)Android Studio Profiler及常见性能图表解读
  • vscode 执行 vue 命令无效/禁止运行
  • C++语言系列-STL容器和算法
  • 【Web前端】Promise的使用
  • TDK推出第二代用于汽车安全应用的6轴IMU
  • 免费S3客户端工具大赏
  • 前端访问后端实现跨域
  • TCP和UDP通信基础
  • 微服务中的技术使用与搭配:如何选择合适的工具构建高效的微服务架构
  • 找出字符串第一个匹配项的下标
  • 面向FWA市场!移远通信高性能5G-A模组RG650V-NA通过北美两大重要运营商认证
  • Matlab实现北方苍鹰优化算法优化随机森林算法模型 (NGO-RF)(附源码)
  • 搭建环境 配置编译运行 mpi-test-suite
  • 夜神模拟器启动报错:虚拟机启动失败 请进行修复 关闭hyper-v
  • 投资策略规划最优决策分析