摘要:在使用前必须实例化,称为或。在中发送消息时,该消息仅包含要执行的的名称。每一个维护一个名称和对应函数的映射,这称为。虽然可以依赖于当前应用,但最佳实践是将应用实例传递给任何需要它的对象,这个行为可以称为。
Celery在使用前必须实例化,称为application或app。app是线程安全的,具有不同配置、组件、task的多个Celery应用可以共存于同一个进程空间。
# 创建Celery应用 >>> from celery import Celery >>> app = Celery() >>> app
最后一行文本化显示了Celery应用:包含应用所属类的名称,当前主模块名,以及内存地址。唯一重要的信息是模块名称。
Main Name在Celery中发送task消息时,该消息仅包含要执行的task的名称。每一个worker维护一个task名称和对应函数的映射,这称为task registry。
当定义一个task时,该task将注册到本地:
>>> @app.task ... def add(x, y): ... return x + y >>> add <@task: __main__.add> >>> add.name __main__.add >>> app.tasks["__main__.add"] <@task: __main__.add>
当Celery无法检测task函数属于哪个模块时,使用main模块名生成初始task名称。
这种方式仅适用于以下两种场景:
定义task的模块作为程序运行
app在python shell中创建
# tasks.py from celery import Celery app = Celery() @app.task def add(x, y): return x + y if __name__ == "__main__": app.worker_main()
如果直接运行tasks.py,task名将以__main__为前缀,但如果tasks.py被其他程序导入,task名将以tasks为前缀。如下:
>>> from tasks import add >>> add.name tasks.add
也可以直接指定主模块名:
>>> app = Celery("tasks") >>> app.main "tasks" >>> @app.task ... def add(x, y): ... return x + y >>> add.name tasks.addConfiguration
可以通过直接设置,或使用专用配置模块对Celery进行配置。
通过app.conf属性查看或直接设置配置:
>>> app.conf.timezone "Europe/London" >>> app.conf.enable_utc = True
或用app.conf.update方法一次更新多个配置:
>>> app.conf.update( ... enable_utc=True, ... timezone="Europe/London", ...)config_from_object
app.config_from_object()方法从配置模块或对象中导入配置。需要注意的是:调用config_from_object()方法将重置在这之前配置的任何设置。
使用模块名
app.config_from_object()方法接收python模块的完全限定名(fully qualified name)或具体到其中的某个属性名,例如"celeryconfig", "myproj.config.celery", 或"myproj.config:CeleryConfig":
from celery import Celery app = Celery() app.config_from_object("celeryconfig")
只要能够正常执行import celeryconfig,app就能正常配置。
使用模块对象
也可以传入一个已导入的模块对象,但不建议这样做。
import celeryconfig from celery import Celery app = Celery() app.config_from_object(celeryconfig)
更推荐使用模块名的方式,因为这样在使用prefork pool时不需要序列化该模块。如果在实际应用中出现配置问题或序列化错误,请尝试使用模块名的方式。
使用配置类或对象
from celery import Celery app = Celery() class Config: enable_utc = True timezone = "Europe/London" app.config_from_object(Config)config_from_envvar
app.config_from_envvar()方法从环境变量中接收配置模块名。
import os from celery import Celery #: Set default configuration module name os.environ.setdefault("CELERY_CONFIG_MODULE", "celeryconfig") app = Celery() app.config_from_envvar("CELERY_CONFIG_MODULE")
通过环境变量指定配置模块:
$ CELERY_CONFIG_MODULE="celeryconfig.prod" celery worker -l infoCensored configuration
如果要显示Celery配置,可能需要过滤某些敏感信息如密码、密钥等。Celery提供了几种用于帮助显示配置的实用方法。
humanize()
该方法按行返回字符串形式的配置,默认只包含改动过的配置,如果要显示内置的默认配置,设置with_defaults参数为True:
>>> app.conf.humanize(with_defaults=False, censored=True)
table()
该方法返回字典形式的配置:
>>> app.conf.table(with_defaults=False, censored=True)
Celery不会移除所有的敏感信息,因为它使用正则表达式匹配键并判断是否移除。如果用户添加了包含敏感信息的自定义配置,可以使用Celery可能标记为敏感配置的名称来命名(API, TOKEN, KEY, SECRET, PASS, SIGNATURE, DATABASE)。
Laziness应用实例是惰性的。
创建Celery实例只会执行以下操作:
创建用于event的logical clock instance
创建task registry
设置为当前应用(除非禁用了set_as_current参数)
调用app.on_init()回调函数(默认不执行任何操作)
app.task()装饰器不会在task定义时立即创建task,而是在task使用时或finalized应用后创建。
下例说明了在使用task或访问其属性前,都不会创建task:
>>> @app.task >>> def add(x, y): ... return x + y >>> type(add)>>> add.__evaluated__() False >>> add # <-- causes repr(add) to happen <@task: __main__.add> >>> add.__evaluated__() True
应用的Finalization指显式地调用app.finalize()方法或隐式地访问app.tasks属性。
finalized应用将会:
复制必须在应用间共享的task。task默认是共享的,但如果禁用了task装饰器的shared属性,将属于应用私有。
评估所有待处理的task装饰器
确保所有task绑定到当前应用。将task绑定到某个应用,以便可以从配置中读取默认值。
Breaking the chain虽然可以依赖于当前应用,但最佳实践是将应用实例传递给任何需要它的对象,这个行为可以称为app chain。
# 依赖于当前应用(bad) from celery import current_app class Scheduler(object): def run(self): app = current_app
# 传递应用实例(good) class Scheduler(object): def __init__(self, app): self.app = app
在开发模式设置CELERY_TRACE_APP环境变量,可以在应用链断开时抛出异常:
$ CELERY_TRACE_APP=1 celery worker -l infoAbstract Tasks
使用task()装饰器创建的task都继承自celery.app.task模块的Task基类。继承该类可以自定义task类:
from celery import Task # 或者 from celery.app.task import Task class DebugTask(Task): def __call__(self, *args, **kwargs): print("TASK STARTING: {0.name}[{0.request.id}]".format(self)) return super(DebugTask, self).__call__(*args, **kwargs)
如果要重写__call__()方法,记得调用super。这样在task直接调用时会执行基类的默认事件。
Task基类是特殊的,因为它并未绑定到任何特定的应用。一旦task绑定到应用,它将读取配置以设置默认值等。
通过base参数指定基类
@app.task(base=DebugTask) def add(x, y): return x + y
通过app.Task属性指定基类
>>> from celery import Celery, Task >>> app = Celery() >>> class MyBaseTask(Task): ... queue = "hipri" >>> app.Task = MyBaseTask >>> app.Task>>> @app.task ... def add(x, y): ... return x + y >>> add <@task: __main__.add> >>> add.__class__.mro() [ >, , , ]
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/43221.html
摘要:所以这就现实了在中使用的应用上下文。要引入请求上下文,需要考虑这两个问题如何在中产生请求上下文。中有和可以产生请求上下文。具体的思路还是在中重载类,通过,在的上下文环境下执行。将他们传入,生成伪造的请求上下文可以覆盖大多数的使用情况。 其实我只是想把邮件发送这个动作移到Celery中执行。既然用到了Celery,那么每次发邮件都单独开一个线程似乎有点多余,异步任务还是交给Celery吧...
摘要:使用异步框架,例如等等,装饰异步任务。它是一个专注于实时处理的任务队列,同时也支持任务调度。不存储任务状态。标识要使用的默认序列化方法的字符串。指定该任务的结果存储后端用于此任务。 概述: 我们考虑一个场景,公司有一个需求,现在需要做一套web系统,而这套系统某些功能需要使用...
摘要:中常用的几个框架有等,今天来总结一下和的不同。本文使用的环境是。文件可以加载路由信息和项目配置信息,文件负责启动项目。以上就简单的比较了和几个方面的不同,它们各有优缺点,实际工作中可以根据不同的需求选择不同的框架进行开发。 python中常用的几个web框架有django, tornado, flask等,今天来总结一下django和tornado的不同。工作中django和torna...
摘要:基于网,分享项目的组网架构和部署。项目组网架构架构说明流项目访问分为两个流,通过分两个端口暴露给外部使用数据流用户访问网站。通过进行配置,使用作为异步队列来存储任务,并将处理结果存储在中。 基于Raindrop网,分享项目的组网架构和部署。 项目组网架构 showImg(https://cloud.githubusercontent.com/assets/7239657/1015704...
摘要:主要是为了实现系统之间的双向解耦而实现的。问题及优化队列过长问题使用上述方案的异步非阻塞可能会依赖于的任务队列长度,若队列中的任务过多,则可能导致长时间等待,降低效率。 Tornado和Celery介绍 1.Tornado Tornado是一个用python编写的一个强大的、可扩展的异步HTTP服务器,同时也是一个web开发框架。tornado是一个非阻塞式web服务器,其速度相当快。...
阅读 3492·2019-08-30 15:53
阅读 3408·2019-08-29 16:54
阅读 2194·2019-08-29 16:41
阅读 2400·2019-08-23 16:10
阅读 3379·2019-08-23 15:04
阅读 1346·2019-08-23 13:58
阅读 348·2019-08-23 11:40
阅读 2454·2019-08-23 10:26