摘要:项目中需要用到定时器和循环执行。运用线程执行轮询操作,也有运用系统的的文章最多,但是太麻烦。和中间人的消息传输支持所有特性,但也提供大量其他实验性方案的支持,包括用进行本地开发。同时也包含了对任务的控制。后续有需求在继续。
项目中需要用到定时器和循环执行。去网上搜了一下,比较常见的有一下集中。运用Python线程执行轮询操作,也有运用Linux系统的Cron,Celery的文章最多,但是太麻烦。看看就知道,Celery 需要一个发送和接受消息的传输者。RabbitMQ 和 Redis 中间人的消息传输支持所有特性,但也提供大量其他实验性方案的支持,包括用 SQLite 进行本地开发。需要用到队列,对于这点需求简直就是大材小用。最后找到了比较合适的Flask-APScheduler。
介绍看看 github的flask-apscheduler介绍。
Loads scheduler configuration from Flask configuration.(支持从Flask中加载调度)
Loads job definitions from Flask configuration.(支持从Flask中加载任务配置)
Allows to specify the hostname which the scheduler will run on.(允许指定服务器运行任务)
Provides a REST API to manage the scheduled jobs.(提供Rest接口管理任务)
Provides authentication for the REST API.(提供Rest接口认证)
安装及配置pip install Flask-APScheduler
在Flask配置文件中添加
SCHEDULER_API_ENABLED = True JOBS = [ { "id": "job_1h_data", "func": job_1h_data, "args": "", "trigger": { "type": "cron", "day_of_week": "0-6", "hour": "*", "minute": "1", "second": "0" } }, { "id": "job_announce", "func": exchange_an, "args": "", "trigger": "interval", "seconds": 300 } ]
上面指定了每一小时获取所有货币24h最高位以及交易所公告。
获取公告def exchange_an(): """ :param start_date: 开始时间 YYYY-MM-DD HH:MM:SS :param end_date: 结束时间 YYYY-MM-DD HH:MM:SS :return: 推送消息,保持数据库 """ current_local = time.time() start_date = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(current_local - 300)) end_date = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(current_local)) announce = pro.query("exchange_ann", start_date=start_date, end_date=end_date) print("请求交易所公告...") for x in announce.values: s = { "title": x[0], "content": x[1], "type": x[2], "url": x[3], "datetime": x[4] } value = json.dumps(s) print(value) mqttClient.publish("system/ex_announce", value)动态添加任务
# coding:utf-8 from apscheduler.schedulers.blocking import BlockingScheduler import datetime def aps_test(x): print datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), x scheduler = BlockingScheduler() scheduler.add_job(func=aps_test, args=("定时任务",), trigger="cron", second="*/5") scheduler.add_job(func=aps_test, args=("一次性任务",), next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=12)) scheduler.add_job(func=aps_test, args=("循环任务",), trigger="interval", seconds=3, id="interval_task") scheduler.start() """ 暂停任务 """ scheduler.pause_job("interval_task") """ 恢复任务 """ scheduler.resume_job("interval_task") """ 删除任务 """ scheduler.remove_job("interval_task")
apscheduler支持添加三种方式的任务,分别是定时任务,一次性任务及循环任务。同时也包含了对任务的控制。
总结因为是单机版本,所以指定服务器运行任务,Rest接口管理任务,Rest接口认证就不写了。后续有需求在继续。
欢迎长按下图 -> 识别图中二维码或者微信扫一扫关注我的公众号
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/42439.html
摘要:日期触发一次性指定日期作业的运行日期或时间指定时区运行一次运行一次间隔调度间隔几周间隔几天间隔几小时间隔几分钟间隔多少秒开始日期结束日期时区每两个小时调一下触发年,位数字月范围日范围周范围周内第几天或者星期几范围或者时范围 Flask Schedule Flask-APScheduler a Flask extension supported for the APScheduler w...
摘要:贡献者飞龙版本最近总是有人问我,把这些资料看完一遍要用多长时间,如果你一本书一本书看的话,的确要用很长时间。为了方便大家,我就把每本书的章节拆开,再按照知识点合并,手动整理了这个知识树。 Special Sponsors showImg(https://segmentfault.com/img/remote/1460000018907426?w=1760&h=200); 贡献者:飞龙版...
摘要:一步一步学习一直都有发布他开发的教程。在上有他免费的教程,并且宣称是世上最深入的系列。基础在上有个非常的视频教程。的官网教程非常值得你从头读到尾。使用框架这是我们最后一个教程的介绍。不过在和已经有为你准备了不错的免费课程哈 一步一步学习Vue 2 (Laracasts) Jeffrey Way一直都有发布他web开发的教程。他曾经在30天内教会了我使用jquery。在Laracast...
摘要:层叠样式表二修订版这是对作出的官方说明。速查表两份表来自一份关于基础特性,一份关于布局。核心第一篇一份来自的基础参考指南简写速查表简写形式参考书使用层叠样式表基础指南,包含使用的好处介绍个方法快速写成高质量的写出高效的一些提示。 迄今为止,我已经收集了100多个精通CSS的资源,它们能让你更好地掌握CSS技巧,使你的布局设计脱颖而出。 CSS3 资源 20个学习CSS3的有用资源 C...
摘要:下表整理了目前的版本与版本的兼容关系还未所以,不论您是在读我的基础教程基础教程还是正在连载的系列教程。 这篇博文是临时增加出来的内容,主要是由于最近连载《Spring Cloud Alibaba基础教程》系列的时候,碰到读者咨询的大量问题中存在一个比较普遍的问题:版本的选择。其实这类问题,在之前写Spring Cloud基础教程的时候,就已经发过一篇《聊聊Spring Cloud版本的...
阅读 1026·2021-11-22 15:35
阅读 1659·2021-10-26 09:49
阅读 3192·2021-09-02 15:11
阅读 2023·2019-08-30 15:53
阅读 2609·2019-08-30 15:53
阅读 2889·2019-08-30 14:11
阅读 3494·2019-08-30 12:59
阅读 3192·2019-08-30 12:53