【Python】定时器快速实现
分为内置方法 与 第三方库的方法
1.sleep法(阻塞)
通过 while + sleep 实现定时任务
(1) 存在时间漂移(等待运行时间)
import time
def loopMonitor():while True:MonitorSystem()# 1min检查一次time.sleep(60)
loopMonitor()
(2) 维护sleep时间
import time
def loopMonitor():while True:MonitorSystem()#2s检查一次time.sleep(60.0 - ((time.time() - starttime) % 60.0))
loopMonitor()
2.自定义法 (+线程、非阻塞)
import threading
import timeclass RepeatedTimer(object):def __init__(self, interval, function, *args, **kwargs):self._timer = Noneself.interval = intervalself.function = functionself.args = argsself.kwargs = kwargsself.is_running = Falseself.next_call = time.time()self.start()def _run(self):self.is_running = Falseself.start()self.function(*self.args, **self.kwargs)def start(self):if not self.is_running:self.next_call += self.intervalself._timer = threading.Timer(self.next_call - time.time(), self._run)self._timer.start()self.is_running = Truedef stop(self):self._timer.cancel()self.is_running = Falsefrom time import sleep
def hello(name):print "Hello %s!" % nameprint "starting..."
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()
try:sleep(5) # your long-running job goes here...
finally:rt.stop() # better in a try/finally block to make sure the program ends!
3.Twisted
更加健壮,并且实现了许多功能:守护进程、日志记录或异常处理
from twisted.internet import task, reactortimeout = 60.0 # Sixty secondsdef doWork():#do work herepassl = task.LoopingCall(doWork)
l.start(timeout) # call every sixty secondsreactor.run()
4.schedule (简单方便,推荐)
import schedule
import timedef job():print("I'm working...")#schedule.every(1)创建Job, seconds.do(func)按秒间隔查询并执行
schedule.every(1).seconds.do(func)
#添加任务按分执行
schedule.every(1).minutes.do(func)
#添加任务按天执行
schedule.every(1).days.do(func)
#添加任务按周执行
schedule.every().weeks.do(func)
#添加任务每周一执行,执行时间为下周一这一时刻时间
schedule.every().monday.do(func)schedule.every().monday.at("12:00").do(job)
# 每隔10分钟执行一次任务
schedule.every(10).minutes.do(job)
# 每隔一小时执行一次任务
schedule.every().hour.do(job)
# 每天10:30执行一次任务
schedule.every().day.at("10:30").do(job)
# 每隔5到10分钟运行一次任务?
schedule.every(5).to(10).minutes.do(job)
# 每周一的这个时候执行一次任务
schedule.every().monday.do(job)
# 每周三 13:15执行一次任务
schedule.every().wednesday.at("13:15").do(job)
# # 每分钟的第17秒执行任务
schedule.every().minute.at(":17").do(job)while True:schedule.run_pending() # 检测是否有到期任务time.sleep(1) # 避免一直查询cpu过高。sleep时间不应该小于执行时间,否则会线程堆积。
5.Apscheduler (功能多,推荐)
调度器(scheduler)
- BlockingScheduler: 调度器在当前进程的主线程中运行,会阻塞当前线程。
- BackgroundScheduler: 调度器在后台线程中运行,不会阻塞当前线程。
- AsyncIOScheduler: 结合asyncio模块一起使用。
- GeventScheduler: 程序中使用gevent作为IO模型和GeventExecutor配合使用。
- TornadoScheduler: 程序中使用Tornado的IO模型,用 ioloop.add_timeout 完成定时唤醒。
- TwistedScheduler: 配合TwistedExecutor,用reactor.callLater完成定时唤醒。
- QtScheduler: 应用是一个Qt应用,需使用QTimer完成定时唤醒。
触发器(trigger)
- date是最基本的一种调度,作业任务只会执行一次。参数详见
- interval触发器,固定时间间隔触发。参数详见
- cron 触发器,在特定时间周期性地触发,和Linux crontab格式兼容。它是功能最强大的触发器。参数详见
作业存储(job store)
- 添加任务,有两种添加方法,一种add_job(), 另一种是scheduled_job()修饰器来修饰函数。
from datetime import datetime
from apscheduler.schedulers.blocking import BlockingScheduler
scheduler = BlockingScheduler() # 第一种
@scheduler.scheduled_job(job_func, 'interval', seconds=10)
def timed_task(): print(datetime.now().strftime("%Y-%m-%d %H:%M:%S")) # 第二种
scheduler.add_job(timed_task, 'interval', seconds=5)
scheduler.start()
- 删除任务,两种方法:remove_job() 和 job.remove()。remove_job()是根据任务的id来移除,所以要在任务创建的时候指定一个 id。job.remove()则是对任务执行remove方法。
scheduler.add_job(job_func, 'interval', seconds=20, id='one')
scheduler.remove_job(one) task = add_job(task_func, 'interval', seconds=2, id='job_one')
task.remvoe()
- 获取任务列表,通过scheduler.get_jobs()方法能够获取当前调度器中的所有任务的列表
tasks = scheduler.get_jobs()
- 关闭任务,使用scheduler.shutdown()默认情况下调度器会等待所有正在运行的作业完成后,关闭所有的调度器和作业存储。
scheduler.shutdown()
scheduler.shutdown(wait=false)
注意点
时区问题
报错:ValueError: Timezone offset does not match system offset: 0 != 28800. Please, check your config files.
分析:通过分析异常日志,发现APScheduler的默认timezone,而“0”是获取的系统环境变量的TZ时间28800对应timezone为“Asia/Shanghai”, 而0对应timezone为“UTC”,所以需将系统环境变量的时区与APScheduler的时区设置为一致。
解决:
#!/usr/bin/python
# -*- coding: utf-8 -*-from apscheduler.schedulers.background import BackgroundScheduler
import osos.environ['TZ']= "Asia/Shanghai"
scheduler = BackgroundScheduler(timezone="Asia/Shanghai")
其他解决方案:
如果部署应用dockerfile配置,也可以在dockerfile中设定系统时区。