[ ]
ZhouSa.com 1. 定时任务
我们当然可以使用crontab或者airflow来执行定时任务,但如果我们希望不依赖其他工具只用python实现定时任务,那就需要使用第三方库来实现了,目前我知道的比较好用的定时任务库有两个:
apscheduler
支持最多的底层实现方式,因此比较重,同时定时语法可读性不高schedule
只支持线程模型,但定时语法可读性比较高,接口也较为简单
我个人比较建议使用线程模型就用schedule
,使用协程模型就用apscheduler
1.1. 使用schedule做基于线程的定时任务
schedule这个库.
这个库有两种用法:
- 起一个进行做定时任务
- 结合其他服务,挂在后台做定时任务
一个基本的定时任务结构如下:
import schedule import time def job(): print("I'm working...") schedule.every(10).seconds.do(job) count=0# while True: if count >= 15:# break# schedule.run_pending() time.sleep(1) count+=1
I'm working... I'm working... I'm working...
打注释的部分是不用的,通常我们需要维持主线程是一个死循环.
上面的代码中实际执行的部分在job
函数中.而定义日程表则使用的schedule
对象
如果要让主进程不用堵塞(通常用在服务上),我们可以为其打个补丁
from types import MethodType import schedule import time import threading def run_continuously(self, interval=1): """Continuously run, while executing pending jobs at each elapsed time interval. @return cease_continuous_run: threading.Event which can be set to cease continuous run. Please note that it is *intended behavior that run_continuously() does not run missed jobs*. For example, if you've registered a job that should run every minute and you set a continuous run interval of one hour then your job won't be run 60 times at each interval but only once. """ cease_continuous_run = threading.Event() class ScheduleThread(threading.Thread): @classmethod def run(cls): while not cease_continuous_run.is_set(): self.run_pending() time.sleep(interval) continuous_thread = ScheduleThread() continuous_thread.start() return cease_continuous_run schedule.default_scheduler.run_continuously = MethodType(run_continuously, schedule.default_scheduler) def run_continuously(interval=1): """Continuously run, while executing pending jobs at each elapsed time interval. @return cease_continuous_run: threading.Event which can be set to cease continuous run. Please note that it is *intended behavior that run_continuously() does not run missed jobs*. For example, if you've registered a job that should run every minute and you set a continuous run interval of one hour then your job won't be run 60 times at each interval but only once. """ return schedule.default_scheduler.run_continuously(interval) schedule.run_continuously = run_continuously
def job(): print("I'm working...") schedule.every(10).seconds.do(job) schedule.run_continuously()
I'm working... <threading.Event at 0x108eff358> I'm working... I'm working... I'm working... I'm working... I'm working... I'm working...
1.2. 使用apscheduler做基于协程的定时任务
apscheduler的使用方式也简单,将job封装成协程函数,放入scheduler中,设置好间隔时间就行
from apscheduler.schedulers.asyncio import AsyncIOScheduler scheduler = AsyncIOScheduler() async def job(): print("I'm working...") scheduler.add_job(job, 'interval', seconds=3) scheduler.start()
I'm working... I'm working...
scheduler.pause()
还没有评论,来说两句吧...