资讯专栏INFORMATION COLUMN

Python Apscheduler源代码解析(一) 任务调度流程

chavesgu / 1232人阅读

摘要:最近公司有项目需要使用到定时任务,其定时逻辑类似于的,就使用了这个类库。在一次循环结束之前会计算任务下次执行事件与当前时间之差,然后让调度线程挂起直到那个时间到来。

最近公司有项目需要使用到定时任务,其定时逻辑类似于linux的Cron,就使用了Apscheduler这个类库。基于公司的业务,需要修改Apshceduler,故而研究了一下Apscheduler的代码。

Apscheduler的调度逻辑非常简单,越简单的东西往往也越有效。

调度器会开辟一个线程,这个线程会循环的从job_store中找到任务,计算任务的执行时间,并与当前时间做比较。如果任务的执行事件<=当前时间,就将任务的firetime放到一个列表中(runtimes)

    def _get_run_times(self, now):
        run_times = []
        next_run_time = self.next_run_time
        while next_run_time and next_run_time <= now:
            run_times.append(next_run_time)
            next_run_time = self.trigger.get_next_fire_time(next_run_time, now)

        return run_times

如果runtimes不为空,就将其放入Executor中,下面代码中的executor不是Python的线程池类,是Apscheduler的一个类,当然了,最终的结果是将任务放到线程池当中

                if run_times:
                    try:
                        executor.submit_job(job, run_times)

在BaseExecutor类中,有一个abstract method,负责将任务放到线程池当中,在其子类BasePoolExecutor中,继承了这个方法

    def _do_submit_job(self, job, run_times):
        def callback(f):
            exc, tb = (f.exception_info() if hasattr(f, "exception_info") else
                       (f.exception(), getattr(f.exception(), "__traceback__", None)))
            if exc:
                self._run_job_error(job.id, exc, tb)
            else:
                self._run_job_success(job.id, f.result())

        f = self._pool.submit(run_job, job, job._jobstore_alias, run_times, self._logger.name)
        f.add_done_callback(callback)

代码中的self._pool可以是线程池,也可以是进程池,在concurrent.futures包中,已经是python3的标准类库了。

关于调度器的事件循环,如果让他一直循环不断的从job_store中取任务,然后判断,这样会十分浪费资源。Apscheduler在一次循环结束之前会计算任务下次执行事件与当前时间之差,然后让调度线程挂起直到那个时间到来。

    def _main_loop(self):
        wait_seconds = TIMEOUT_MAX
        while self.state != STATE_STOPPED:
            self._event.wait(wait_seconds)
            self._event.clear()
            wait_seconds = self._process_jobs()

self._process_jobs()的返回值就是上面说的那个时间,self._event.wait(wait_seconds)就是让当前线程等待这么长的一段时间

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/40983.html

相关文章

  • Python任务调度模块APScheduler

    介绍 官网文档:http://apscheduler.readthedoc...API:http://apscheduler.readthedoc... APScheduler是一个python的第三方库,用来提供python的后台程序。包含四个组件,分别是: triggers: 任务触发器组件,提供任务触发方式 job stores: 任务商店组件,提供任务保存方式 executors: 任务...

    zxhaaa 评论0 收藏0
  • APScheduler任务调度利器

    摘要:中任务调度一般用中的任务调度工具也有不少等。调度器配置示例方式一方式二三略。移除调用放到,参数为调用实例的方法注意如果任务已经调度完毕,并且之后也不会再被执行的情况下,会被自动移除。可以监听调度任务执行情况相关的事件。 Java中任务调度一般用Quartz,Python中的任务调度工具也有不少:Celery,RQ,APScheduler等。Celery:非常强大的分布式任务调度框架RQ...

    Flink_China 评论0 收藏0
  • 定时任务框架APScheduler学习详解

    摘要:安装利用进行安装源码安装有四种组成部分触发器包含调度逻辑,每一个作业有它自己的触发器,用于决定接下来哪一个作业会运行。除了他们自己初始配置意外,触发器完全是无状态的。 APScheduler简介 在平常的工作中几乎有一半的功能模块都需要定时任务来推动,例如项目中有一个定时统计程序,定时爬出网站的URL程序,定时检测钓鱼网站的程序等等,都涉及到了关于定时任务的问题,第一时间想到的是利用t...

    sewerganger 评论0 收藏0
  • Flask-APScheduler使用教程

    摘要:项目中需要用到定时器和循环执行。运用线程执行轮询操作,也有运用系统的的文章最多,但是太麻烦。和中间人的消息传输支持所有特性,但也提供大量其他实验性方案的支持,包括用进行本地开发。同时也包含了对任务的控制。后续有需求在继续。 项目中需要用到定时器和循环执行。去网上搜了一下,比较常见的有一下集中。运用Python线程执行轮询操作,也有运用Linux系统的Cron,Celery的文章最多,但...

    Noodles 评论0 收藏0
  • django开发-定时任务的使用

    摘要:今天介绍在中使用定时任务的两种方式。添加并启动定时任务其它命令显示当前的定时任务删除所有定时任务今天的定时任务就说到这里,有错误之处,欢迎交流指正 今天介绍在django中使用定时任务的两种方式。 方式一: APScheduler1)安装: pip install apscheduler 2)使用: from apscheduler.scheduler import Scheduler...

    wean 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<