资讯专栏INFORMATION COLUMN

celery动态添加任务

yanbingyun1990 / 3395人阅读

摘要:是一个基于的分布式调度系统,文档在这最近有个需求想要动态的添加任务而不用重启服务找了一圈没找到什么好办法也有可能是文档没看仔细,所以只能自己实现囉为动态添加任务,首先我想到的是传递一个函数进去,让某个特定任务去执行这个传递过去的函数,就像这

celery是一个基于Python的分布式调度系统,文档在这 ,最近有个需求,想要动态的添加任务而不用重启celery服务,找了一圈没找到什么好办法(也有可能是文档没看仔细),所以只能自己实现囉

为celery动态添加任务,首先我想到的是传递一个函数进去,让某个特定任务去执行这个传递过去的函数,就像这样

@app.task
def execute(func, *args, **kwargs):
    return func(*args, **kwargs)

很可惜,会出现这样的错误

kombu.exceptions.EncodeError: Object of type "function" is not JSON serializable

换一种序列化方式

@app.task(serializer="pickle")
def execute(func, *args, **kwargs):
    return func(*args, **kwargs)

结果又出现一大串错误信息

ERROR/MainProcess] Pool callback raised exception: ContentDisallowed("Refusing to deserialize untrusted content of type pickle (application/x-python-serialize)",)
Traceback (most recent call last):
  File "/home/jl/.virtualenvs/test/lib/python3.6/site-packages/kombu/utils/objects.py", line 42, in __get__
    return obj.__dict__[self.__name__]
KeyError: "chord"

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/jl/.virtualenvs/test/lib/python3.6/site-packages/kombu/utils/objects.py", line 42, in __get__
    return obj.__dict__[self.__name__]
KeyError: "_payload"

换一种思路

func = import_string(func)

不知道这样是否可以,结果测试: No

哎,流年不利.

最后一直测试,一直测试,终于找到了一种办法,直接上代码

from importlib import import_module, reload

app.conf.CELERY_IMPORTS = ["task", "task.all_task"]

def import_string(import_name):
    import_name = str(import_name).replace(":", ".")
    modules = import_name.split(".")
    mod = import_module(modules[0])
    for comp in modules[1:]:
        if not hasattr(mod, comp):
            reload(mod)
        mod = getattr(mod, comp)
    return mod

@app.task
def execute(func, *args, **kwargs):
    func = import_string(func)
    return func(*args, **kwargs)

项目结构是这样的

├── celery_app.py
├── config.py
├── task
│   ├── all_task.py
│   ├── __init__.py

注意: 任务必须大于等于两层目录

以后每次添加任务都可以先添加到all_task.py里,调用时不用再重启celery服务

# task/all_task.py

def ee(c, d):
    return c, d, "你好"

# example
from celery_app import execute

execute.delay("task.all_task.ee", 2, 444)

ok,另外发现celery也支持任务定时调用,就像这样

execute.apply_async(args=["task.all_task.aa"], eta=datetime(2017, 7, 9, 8, 12, 0))

简单实现一个任务重复调用的功能

@app.task
def interval(func, seconds, args=(), task_id=None):
    next_run_time = current_time() + timedelta(seconds=seconds)
    kwargs = dict(args=(func, seconds, args), eta=next_run_time)
    if task_id is not None:
        kwargs.update(task_id=task_id)
    interval.apply_async(**kwargs)
    func = import_string(func)
    return func(*args)

大概意思就是先计算下次运行的时间,然后把任务添加到celery队列里,这里有个task_id有些问题,因为假设添加了每隔3s执行一个任务,
它的task_id默认会使用uuid生成,如果想要再移除这个任务就不太方便,自定task_id可能会好一些,另外也许需要判断task_id是否存在

AsyncResult(task_id).state

ok,再献上一个好用的函数

from inspect import getmembers, isfunction

def get_tasks(module="task"):
    return [{
        "name": "task:{}".format(f[1].__name__),
        "doc": f[1].__doc__,
    } for f in getmembers(import_module(module), isfunction)]

就这样.

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/38697.html

相关文章

  • Celery中使用Flask的上下文

    摘要:所以这就现实了在中使用的应用上下文。要引入请求上下文,需要考虑这两个问题如何在中产生请求上下文。中有和可以产生请求上下文。具体的思路还是在中重载类,通过,在的上下文环境下执行。将他们传入,生成伪造的请求上下文可以覆盖大多数的使用情况。 其实我只是想把邮件发送这个动作移到Celery中执行。既然用到了Celery,那么每次发邮件都单独开一个线程似乎有点多余,异步任务还是交给Celery吧...

    Sourcelink 评论0 收藏0
  • Flask+Celery+Redis实现队列化异步任务

    摘要:使用异步框架,例如等等,装饰异步任务。它是一个专注于实时处理的任务队列,同时也支持任务调度。不存储任务状态。标识要使用的默认序列化方法的字符串。指定该任务的结果存储后端用于此任务。 概述:         我们考虑一个场景,公司有一个需求,现在需要做一套web系统,而这套系统某些功能需要使用...

    Ali_ 评论0 收藏0
  • Python之celery的简介与使用

    摘要:的简介是一个基于分布式消息传输的异步任务队列,它专注于实时处理,同时也支持任务调度。目前支持等作为消息代理,但适用于生产环境的只有和官方推荐。任务处理完后保存状态信息和结果,以供查询。 celery的简介   celery是一个基于分布式消息传输的异步任务队列,它专注于实时处理,同时也支持任务调度。它的执行单元为任务(task),利用多线程,如Eventlet,gevent等,它们能被...

    LeexMuller 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<