资讯专栏INFORMATION COLUMN

APScheduler任务调度利器

Flink_China / 2890人阅读

摘要:中任务调度一般用中的任务调度工具也有不少等。调度器配置示例方式一方式二三略。移除调用放到,参数为调用实例的方法注意如果任务已经调度完毕,并且之后也不会再被执行的情况下,会被自动移除。可以监听调度任务执行情况相关的事件。

Java中任务调度一般用Quartz,Python中的任务调度工具也有不少:Celery,RQ,APScheduler等。
Celery:非常强大的分布式任务调度框架
RQ:基于Redis的作业队列工具
APScheduler:一款强大的任务调度工具

RQ参考Celery,据说要比Celery轻量级(Really?)
APScheduler感觉更像Quartz。
本人小小的建议是一般项目用APScheduler,因为不用像Celery那样再多带带启动worker、beat进程,而且API也很简洁。
对于大点分布式项目用Celery

官网:http://apscheduler.readthedoc...
API:http://apscheduler.readthedoc...
当前版本:3.3.0
安装:$ pip install apscheduler
例子:https://github.com/agronholm/...

特性

Advanced Python Scheduler (APScheduler) 一款强大的任务调度工具.
内置了三种调度模式:

Cron风格

间隔性(Interval-based)执行

仅在某个时间执行一次

作业存储支持以下几种方式:
Memory
SQLAlchemy (any RDBMS supported by SQLAlchemy works)
MongoDB
Redis
RethinkDB
ZooKeeper
除了Memory方式不需要序列化之外(一个例外是使用ProcessPoolExecutor),其余都需要作业函数参数可序列化。

支持与以下框架集成:
asyncio (PEP 3156)
gevent
Tornado
Twisted
Qt (using either PyQt or PySide)

基本概念

四大组件:
triggers
job stores
executors
schedulers

schedulers

内置以下几种调度器实现:
BlockingScheduler:
BackgroundScheduler: 默认采用ThreadPoolExecutor池(默认10),可以配置ProcessPoolExecutor,或同时使用
AsyncIOScheduler: 使用asyncio模块
GeventScheduler: 使用gevent
TornadoScheduler: use if you’re building a Tornado application
TwistedScheduler: use if you’re building a Twisted application
QtScheduler: use if you’re building a Qt application

基类:BaseScheduler,可以通过此类查询相关配置选项。

调度器配置示例:

方式一、

from pytz import utc

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor

jobstores = {
    "mongo": MongoDBJobStore(),
    "default": SQLAlchemyJobStore(url="sqlite:///jobs.sqlite")
}
executors = {
    "default": ThreadPoolExecutor(20),
    "processpool": ProcessPoolExecutor(5)
}
job_defaults = {
    "coalesce": False,
    "max_instances": 3
}
scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)

方式二、三:略。请看官网

启动与关闭调度器:

通过调用start()方法来启动

scheduler.start()
scheduler.shutdown()
scheduler.shutdown(wait=False)
triggers

内置以下三种触发器实现:
date: 对已仅执行一次的情绪,指定某个之间点。请参考这里
interval: 指定时间间隔(fixed intervals)周期性执行。请参考这里
cron: 使用cron风格表达式周期性执行。请参考这里

job管理

添加jobs:

调用scheduler.add_job()方法,会返回apscheduler.job.Job实例(可用于job修改、移除等)

使用装饰器scheduled_job()

作业存储注意事项:

除了Memory方式不需要序列化之外(一个例外是使用ProcessPoolExecutor),其余都需要作业函数参数可序列化。
如果需要存储作业,而且每次启动时你的应用都会重新添加一遍作业,那么请在添加job时指定一个唯一的ID,以及指定replace_existing=True,否则每次启动应用都会添加一次job的副本。
如果需要立即启动该任务,请在添加job时指定trigger参数。

移除job:
调用scheduler.remove_job()放到,参数为 job’s ID and job store alias
调用job实例的remove()方法 on the Job instance you got from add_job()

注意:如果任务已经调度完毕,并且之后也不会再被执行的情况下,会被自动移除。

job = scheduler.add_job(myfunc, "interval", minutes=2)
job.remove()

scheduler.add_job(myfunc, "interval", minutes=2, id="my_job_id")
scheduler.remove_job("my_job_id")

暂停和恢复job:

apscheduler.job.Job.pause()
apscheduler.schedulers.base.BaseScheduler.pause_job()

apscheduler.job.Job.resume()
apscheduler.schedulers.base.BaseScheduler.resume_job()

获取jobs列表

apscheduler.get_jobs()

修改job:

可以通过apscheduler.job.Job.modify() or apscheduler.modify_job()修改除了id之外的job属性。

job.modify(max_instances=6, name="Alternate name")

如果你想修改job的调度器,你可以使用apscheduler.job.Job.reschedule() or reschedule_job()

scheduler.reschedule_job("my_job_id", trigger="cron", minute="*/5")
限制同一个job实例的并发执行数

默认情况下同一个job,只允许一个job实例运行。这在某个job在下次运行时间到达之后仍未执行完毕时,能达到控制的目的。你也可以该变这一行为,在你调用add_job时可以传递max_instances=5来运行同时运行同一个job的5个job实例。

job错过执行时间与job合并(Missed job executions and coalescing):

一个job可能由于某些情况错过执行时间,比如上一点提到的,或者是线程池或进程池用光了,或者是当要调度job时,突然down机了等。
这时可以通过设置job的misfire_grace_time选项来指示之后尝试执行的次数。
当然如果这不符合你的期望,你可以合并所有错过时间的job到一个job来执行,通过设定job的coalesce=True

Scheduler events

可以监听调度、任务执行情况相关的事件。

def my_listener(event):
    if event.exception:
        print("The job crashed :(")
    else:
        print("The job worked :)")

scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)

支持的事件列表:
http://apscheduler.readthedoc...

小结

有木有非常强大,又非常易于理解的感觉。Good Job!

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/38258.html

相关文章

  • Python任务调度模块APScheduler

    介绍 官网文档:http://apscheduler.readthedoc...API:http://apscheduler.readthedoc... APScheduler是一个python的第三方库,用来提供python的后台程序。包含四个组件,分别是: triggers: 任务触发器组件,提供任务触发方式 job stores: 任务商店组件,提供任务保存方式 executors: 任务...

    zxhaaa 评论0 收藏0
  • 定时任务框架APScheduler学习详解

    摘要:安装利用进行安装源码安装有四种组成部分触发器包含调度逻辑,每一个作业有它自己的触发器,用于决定接下来哪一个作业会运行。除了他们自己初始配置意外,触发器完全是无状态的。 APScheduler简介 在平常的工作中几乎有一半的功能模块都需要定时任务来推动,例如项目中有一个定时统计程序,定时爬出网站的URL程序,定时检测钓鱼网站的程序等等,都涉及到了关于定时任务的问题,第一时间想到的是利用t...

    sewerganger 评论0 收藏0
  • Python Apscheduler源代码解析(一) 任务调度流程

    摘要:最近公司有项目需要使用到定时任务,其定时逻辑类似于的,就使用了这个类库。在一次循环结束之前会计算任务下次执行事件与当前时间之差,然后让调度线程挂起直到那个时间到来。 最近公司有项目需要使用到定时任务,其定时逻辑类似于linux的Cron,就使用了Apscheduler这个类库。基于公司的业务,需要修改Apshceduler,故而研究了一下Apscheduler的代码。 Apschedu...

    chavesgu 评论0 收藏0
  • Flask-APScheduler使用教程

    摘要:项目中需要用到定时器和循环执行。运用线程执行轮询操作,也有运用系统的的文章最多,但是太麻烦。和中间人的消息传输支持所有特性,但也提供大量其他实验性方案的支持,包括用进行本地开发。同时也包含了对任务的控制。后续有需求在继续。 项目中需要用到定时器和循环执行。去网上搜了一下,比较常见的有一下集中。运用Python线程执行轮询操作,也有运用Linux系统的Cron,Celery的文章最多,但...

    Noodles 评论0 收藏0
  • 关于Flask Schedule

    摘要:日期触发一次性指定日期作业的运行日期或时间指定时区运行一次运行一次间隔调度间隔几周间隔几天间隔几小时间隔几分钟间隔多少秒开始日期结束日期时区每两个小时调一下触发年,位数字月范围日范围周范围周内第几天或者星期几范围或者时范围 Flask Schedule Flask-APScheduler a Flask extension supported for the APScheduler w...

    邹强 评论0 收藏0

发表评论

0条评论

Flink_China

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<