介绍
官网文档:http://apscheduler.readthedoc...
API:http://apscheduler.readthedoc...
APScheduler是一个python的第三方库,用来提供python的后台程序。包含四个组件,分别是:
triggers: 任务触发器组件,提供任务触发方式
job stores: 任务商店组件,提供任务保存方式
executors: 任务调度组件,提供任务调度方式
schedulers: 任务调度组件,提供任务工作方式
安装pip 安装
$ pip install apscheduler
源码安装
$ python setup.py install简单的实例
from apscheduler.schedulers.blocking import BlockingScheduler import time # 实例化一个调度器 scheduler = BlockingScheduler() def job1(): print "%s: 执行任务" % time.asctime() # 添加任务并设置触发方式为3s一次 scheduler.add_job(job1, "interval", seconds=3) # 开始运行调度器 scheduler.start()
输出:
$ python first.py Fri Sep 8 20:41:55 2017: 执行任务 Fri Sep 8 20:41:58 2017: 执行任务 ...各组件功能 trigger组件
trigger提供任务的触发方式,共三种方式:
date:只在某个时间点执行一次run_date(datetime|str)
scheduler.add_job(my_job, "date", run_date=date(2017, 9, 8), args=[]) scheduler.add_job(my_job, "date", run_date=datetime(2017, 9, 8, 21, 30, 5), args=[]) scheduler.add_job(my_job, "date", run_date="2017-9-08 21:30:05", args=[]) # The "date" trigger and datetime.now() as run_date are implicit sched.add_job(my_job, args=[[])
interval: 每隔一段时间执行一次weeks=0 | days=0 | hours=0 | minutes=0 | seconds=0, start_date=None, end_date=None, timezone=None
scheduler.add_job(my_job, "interval", hours=2) scheduler.add_job(my_job, "interval", hours=2, start_date="2017-9-8 21:30:00", end_date="2018-06-15 21:30:00) @scheduler.scheduled_job("interval", id="my_job_id", hours=2) def my_job(): print("Hello World")
cron: 使用同linux下crontab的方式(year=None, month=None, day=None, week=None, day_of_week=None, hour=None, minute=None, second=None, start_date=None, end_date=None, timezone=None)
sched.add_job(my_job, "cron", hour=3, minute=30) sched.add_job(my_job, "cron", day_of_week="mon-fri", hour=5, minute=30, end_date="2017-10-30") @sched.scheduled_job("cron", id="my_job_id", day="last sun") def some_decorated_task(): print("I am printed at 00:00:00 on the last Sunday of every month!")scheduler组件
scheduler组件提供执行的方式,在不同的运用环境中选择合适的方式
BlockingScheduler: 进程中只运行调度器时的方式
from apscheduler.schedulers.blocking import BlockingScheduler import time scheduler = BlockingScheduler() def job1(): print "%s: 执行任务" % time.asctime() scheduler.add_job(job1, "interval", seconds=3) scheduler.start()
BackgroundScheduler: 不想使用任何框架时的方式
from apscheduler.schedulers.background import BackgroundScheduler import time scheduler = BackgroundScheduler() def job1(): print "%s: 执行任务" % time.asctime() scheduler.add_job(job1, "interval", seconds=3) scheduler.start() while True: pass
AsyncIOScheduler: asyncio module的方式(Python3)
from apscheduler.schedulers.asyncio import AsyncIOScheduler try: import asyncio except ImportError: import trollius as asyncio ... ... # while True:pass try: asyncio.get_event_loop().run_forever() except (KeyboardInterrupt, SystemExit): pass
GeventScheduler: gevent方式
from apscheduler.schedulers.gevent import GeventScheduler ... ... g = scheduler.start() # while True:pass try: g.join() except (KeyboardInterrupt, SystemExit): pass
TornadoScheduler: Tornado方式
from tornado.ioloop import IOLoop from apscheduler.schedulers.tornado import TornadoScheduler ... ... # while True:pass try: IOLoop.instance().start() except (KeyboardInterrupt, SystemExit): pass
TwistedScheduler: Twisted方式
from twisted.internet import reactor from apscheduler.schedulers.twisted import TwistedScheduler ... ... # while True:pass try: reactor.run() except (KeyboardInterrupt, SystemExit): pass
QtScheduler: Qt方式
executors组件executors组件提供任务的调度方式
base
debug
gevent
pool(max_workers=10)
twisted
jobstore组件jobstore提供任务的各种持久化方式
base
memory
mongodb
scheduler.add_jobstore("mongodb", collection="example_jobs")
redis
scheduler.add_jobstore("redis", jobs_key="example.jobs", run_times_key="example.run_times")
rethinkdb
scheduler.add_jobstore("rethinkdb", database="apscheduler_example")
sqlalchemy
scheduler.add_jobstore("sqlalchemy", url=url)
zookeeper
scheduler.add_jobstore("zookeeper", path="/example_jobs")
删除任务remove_job如果使用了任务的存储,开启时最好添加replace_existing=True,否则每次开启都会创建任务的副本
开启后任务不会马上启动,可修改trigger参数
# 根据任务实例删除 job = scheduler.add_job(myfunc, "interval", minutes=2) job.remove() # 根据任务id删除 scheduler.add_job(myfunc, "interval", minutes=2, id="my_job_id") scheduler.remove_job("my_job_id")任务的暂停pause_job和继续resume_job
job = scheduler.add_job(myfunc, "interval", minutes=2) # 根据任务实例 job.pause() job.resume() # 根据任务id暂停 scheduler.add_job(myfunc, "interval", minutes=2, id="my_job_id") scheduler.pause_job("my_job_id") scheduler.resume_job("my_job_id")任务的修饰modify和重设reschedule_job
修饰:job.modify(max_instances=6, name="Alternate name")
重设:scheduler.reschedule_job("my_job_id", trigger="cron", minute="*/5")
开启 scheduler.start()
关闭 scheduler.shotdown(wait=True | False)
暂停 scheduler.pause()
继续 scheduler.resume()
监听 http://apscheduler.readthedoc...
def my_listener(event): if event.exception: print("The job crashed :(") else: print("The job worked :)") scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)官方实例
from pytz import utc from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.jobstores.mongodb import MongoDBJobStore from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor jobstores = { "mongo": MongoDBJobStore(), "default": SQLAlchemyJobStore(url="sqlite:///jobs.sqlite") } executors = { "default": ThreadPoolExecutor(20), "processpool": ProcessPoolExecutor(5) } job_defaults = { "coalesce": False, "max_instances": 3 } scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/40820.html
摘要:安装利用进行安装源码安装有四种组成部分触发器包含调度逻辑,每一个作业有它自己的触发器,用于决定接下来哪一个作业会运行。除了他们自己初始配置意外,触发器完全是无状态的。 APScheduler简介 在平常的工作中几乎有一半的功能模块都需要定时任务来推动,例如项目中有一个定时统计程序,定时爬出网站的URL程序,定时检测钓鱼网站的程序等等,都涉及到了关于定时任务的问题,第一时间想到的是利用t...
摘要:中任务调度一般用中的任务调度工具也有不少等。调度器配置示例方式一方式二三略。移除调用放到,参数为调用实例的方法注意如果任务已经调度完毕,并且之后也不会再被执行的情况下,会被自动移除。可以监听调度任务执行情况相关的事件。 Java中任务调度一般用Quartz,Python中的任务调度工具也有不少:Celery,RQ,APScheduler等。Celery:非常强大的分布式任务调度框架RQ...
摘要:最近公司有项目需要使用到定时任务,其定时逻辑类似于的,就使用了这个类库。在一次循环结束之前会计算任务下次执行事件与当前时间之差,然后让调度线程挂起直到那个时间到来。 最近公司有项目需要使用到定时任务,其定时逻辑类似于linux的Cron,就使用了Apscheduler这个类库。基于公司的业务,需要修改Apshceduler,故而研究了一下Apscheduler的代码。 Apschedu...
摘要:项目中需要用到定时器和循环执行。运用线程执行轮询操作,也有运用系统的的文章最多,但是太麻烦。和中间人的消息传输支持所有特性,但也提供大量其他实验性方案的支持,包括用进行本地开发。同时也包含了对任务的控制。后续有需求在继续。 项目中需要用到定时器和循环执行。去网上搜了一下,比较常见的有一下集中。运用Python线程执行轮询操作,也有运用Linux系统的Cron,Celery的文章最多,但...
摘要:今天介绍在中使用定时任务的两种方式。添加并启动定时任务其它命令显示当前的定时任务删除所有定时任务今天的定时任务就说到这里,有错误之处,欢迎交流指正 今天介绍在django中使用定时任务的两种方式。 方式一: APScheduler1)安装: pip install apscheduler 2)使用: from apscheduler.scheduler import Scheduler...
阅读 2888·2021-10-19 10:09
阅读 3097·2021-10-09 09:41
阅读 3347·2021-09-26 09:47
阅读 2616·2019-08-30 15:56
阅读 572·2019-08-29 17:04
阅读 957·2019-08-26 11:58
阅读 2487·2019-08-26 11:51
阅读 3302·2019-08-26 11:29