摘要:所以这就现实了在中使用的应用上下文。要引入请求上下文,需要考虑这两个问题如何在中产生请求上下文。中有和可以产生请求上下文。具体的思路还是在中重载类,通过,在的上下文环境下执行。将他们传入,生成伪造的请求上下文可以覆盖大多数的使用情况。
其实我只是想把邮件发送这个动作移到Celery中执行。
既然用到了Celery,那么每次发邮件都多带带开一个线程似乎有点多余,异步任务还是交给Celery吧。
Celery和Flask一起使用并没有什么不和谐的地方,都可以不用定制的Flask扩展,按照网上随处可见的示例也很简单:
from flask import Flask from celery import Celery app = Flask(__name__) app.config["CELERY_BROKER_URL"] = "redis://localhost:6379/0" app.config["CELERY_RESULT_BACKEND"] = "redis://localhost:6379/0" celery = Celery(app.name, broker=app.config["CELERY_BROKER_URL"]) celery.conf.update(app.config) @celery.task def send_email(): ....
然而,稍微上点规模的Flask应用都会使用Factory模式(中文叫工厂函数,我听着特别扭),即只有在创建Flask实例时,才会初始化各种扩展,这样可以动态的修改扩展程序的配置。比如你有一套线上部署的配置和一套本地开发测试的配置,希望通过不同的启动入口,就使用不同的配置。
使用Factory模式的话,上面的代码大概要修改成这个样:
from flask import Flask from celery import Celery app = Flask(__name__) celery = Celery() def create_app(config_name): app.config.from_object(config[config_name]) celery.conf.update(app.config)
通过config_name,来动态调整celery的配置。然而,这样子是不行的!
Celery的__init__()函数会调用celery._state._register_app()直接就通过传入的配置生成了Celery实例,上面的代码中,celery = Celery()直接使用默认的amqp作为了broker,随后通过celery.conf.update(app.config)是更改不了broker的。这也就是为什么网上的示例代码中,在定义Celery实例时,就传入了broker=app.config["CELERY_BROKER_URL"],而不是之后通过celery.conf.update(app.config)传入。当你的多套配置文件中,broker设置的不同时,就悲剧了。
当然不用自己造轮子,Flask-Celery-Helper就是解决以上问题的FLask扩展。
看看它的__init__()函数:
def __init__(self, app=None): """If app argument provided then initialize celery using application config values. If no app argument provided you should do initialization later with init_app method. :param app: Flask application instance. """ self.original_register_app = _state._register_app # Backup Celery app registration function. _state._register_app = lambda _: None # Upon Celery app registration attempt, do nothing. super(Celery, self).__init__() if app is not None: self.init_app(app)
将_state._register_app函数备份,再置为空。这样__init__()就不会创建Celery实例了。但如果指定了app,那么进入init_app,嗯,大多数Flask扩展都有这个函数,用来动态生成扩展实例。
def init_app(self, app): """Actual method to read celery settings from app configuration and initialize the celery instance. :param app: Flask application instance. """ _state._register_app = self.original_register_app # Restore Celery app registration function. if not hasattr(app, "extensions"): app.extensions = dict() if "celery" in app.extensions: raise ValueError("Already registered extension CELERY.") app.extensions["celery"] = _CeleryState(self, app) # Instantiate celery and read config. super(Celery, self).__init__(app.import_name, broker=app.config["CELERY_BROKER_URL"]) ...
将_state._register_app函数还原,再执行Celery原本的__init__。这样就达到动态生成实例的目的了。接着往下看:
task_base = self.Task # Add Flask app context to celery instance. class ContextTask(task_base): """Celery instance wrapped within the Flask app context.""" def __call__(self, *_args, **_kwargs): with app.app_context(): return task_base.__call__(self, *_args, **_kwargs) setattr(ContextTask, "abstract", True) setattr(self, "Task", ContextTask)
这里重载了celery.Task类,通过with app.app_context():,在app.app_context()的上下文环境下执行Task。对于一个已生成的Flask实例,应用上下文不会随便改变。所以这就现实了在Celery中使用Flask的应用上下文。
下面是官方的示例代码:
# extensions.py from flask_celery import Celery celery = Celery() # application.py from flask import Flask from extensions import celery def create_app(): app = Flask(__name__) app.config["CELERY_IMPORTS"] = ("tasks.add_together", ) app.config["CELERY_BROKER_URL"] = "redis://localhost" app.config["CELERY_RESULT_BACKEND"] = "redis://localhost" celery.init_app(app) return app # tasks.py from extensions import celery @celery.task() def add_together(a, b): return a + b # manage.py from application import create_app app = create_app() app.run()
跟普通的Flask扩展一样了。
Celery中使用Flask上下文在Flask的view函数中调用task.delay()时,这个task相当于一个离线的异步任务,它对Flask的应用上下文和请求上下文一无所知。但是这都可能是异步任务需要用到的。比如发送邮件要用到的render_template和url_for就分别要用到应用上下文和请求上下文。不在celery中引入它们的话,就是Running code outside of a request。
引入应用上下文的工作Flask-Celery-Helper已经帮我们做好了,在Flask的文档中也有相关介绍。实现方法和上面Flask-Celery-Helper的一样。然而,不管是Flask-Celery-Helper还是Flask文档,都没有提及如何在Celery中使用请求上下文。
要引入请求上下文,需要考虑这两个问题:
如何在Celery中产生请求上下文。Flask中有request_context和test_request_context可以产生请求上下文。区别是request_context需要WSGI环境变量environ,而test_request_context根据传入的参数生成请求上下文。我没有找到如何在Celery中获取到WSGI环境变量的方法,所以只能自己传入相关参数生成请求上下文了。
请求上下文是随HTTP请求产生的,要获取请求上下文,就必须在view函数中处理,view函数通过task.delay()发送Celery任务。所以需要重载task.delay(),以获取请求上下文。
具体的思路还是在init_app中重载celery.Task类,通过with app.test_request_context():,在app.test_request_context()的上下文环境下执行Task。
首先获取request,从中整理出test_request_context()需要的参数。根据test_request_context的函数注释,它需要的参数和werkzeug.test.EnvironBuilder类的参数一样。
CONTEXT_ARG_NAME = "_flask_request_context" def _include_request_context(self, kwargs): """Includes all the information about current Flask request context as an additional argument to the task. """ if not has_request_context(): return # keys correspond to arguments of :meth:`Flask.test_request_context` context = { "path": request.path, "base_url": request.url_root, "method": request.method, "headers": dict(request.headers), "data": request.form } if "?" in request.url: context["query_string"] = request.url[(request.url.find("?") + 1):] kwargs[self.CONTEXT_ARG_NAME] = context
_include_request_context函数从request中提取path,base_url,method,headers,data,query_string。将他们传入test_request_context,生成伪造的请求上下文可以覆盖大多数的使用情况。
Celery通过apply_async,apply,retry调用异步任务(delay是apply_async的简化方法)。这里需要重载它们,让这些函数获取request:
def apply_async(self, args=None, kwargs=None, **rest): self._include_request_context(kwargs) return super(ContextTask, self).apply_async(args, kwargs, **rest) def apply(self, args=None, kwargs=None, **rest): self._include_request_context(kwargs) return super(ContextTask, self).apply(args, kwargs, **rest) def retry(self, args=None, kwargs=None, **rest): self._include_request_context(kwargs) return super(ContextTask, self).retry(args, kwargs, **rest)
最后重载celery.Task的__call__方法:
def __call__(self, *args, **kwargs): """Execute task code with given arguments.""" call = lambda: super(ContextTask, self).__call__(*args, **kwargs) context = kwargs.pop(self.CONTEXT_ARG_NAME, None) if context is None or has_request_context(): return call() with app.test_request_context(**context): result = call() # process a fake "Response" so that # ``@after_request`` hooks are executed app.process_response(make_response(result or "")) return result
context是我们从request中获取的参数,将它传给test_request_context,伪造请求上下文,并在这个上下文环境中执行task。既然伪造了请求,那也得为这个假请求生成响应,万一你定义了after_request这个在响应后执行的钩子呢?通过process_response就可以激活after_request。
注意这里并没有传入应用上下文,因为Flask在创建请求上下文时,会判断应用上下文是否为空,为空就先创建应用上下文,再创建请求上下文。
完整代码在这里。
celery = CeleryWithContext()创建的Celery实例就可以给各种task使用了。
另外创建一个celery_worker.py文件,生成一个Flask实例,供Celery的worker使用。
# celery_worker.py #!/usr/bin/env python from app import create_app from app.extensions import celery app = create_app()
启动worker:celery -A celery_worker.celery worker -l info
这下就可以使用Celery发邮件了。唉,还真是麻烦。
http://xion.io/post/code/celery-include-flask-request-context.html
博客地址
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/38572.html
摘要:目的曾经想向前台实时返回任务的状态监控,也查看了很多博客,但是好多也没能如愿,因此基于网上已有的博客已经自己的尝试,写了一个小的,实现前台实时获取后台传输的任务状态。实现仿照其他例子实现了一个简单的后台任务监控。 1. 目的曾经想向前台实时返回Celery任务的状态监控,也查看了很多博客,但是好多也没能如愿,因此基于网上已有的博客已经自己的尝试,写了一个小的demo,实现前台实时获取后...
摘要:使用异步框架,例如等等,装饰异步任务。它是一个专注于实时处理的任务队列,同时也支持任务调度。不存储任务状态。标识要使用的默认序列化方法的字符串。指定该任务的结果存储后端用于此任务。 概述: 我们考虑一个场景,公司有一个需求,现在需要做一套web系统,而这套系统某些功能需要使用...
摘要:基于网,分享项目的组网架构和部署。项目组网架构架构说明流项目访问分为两个流,通过分两个端口暴露给外部使用数据流用户访问网站。通过进行配置,使用作为异步队列来存储任务,并将处理结果存储在中。 基于Raindrop网,分享项目的组网架构和部署。 项目组网架构 showImg(https://cloud.githubusercontent.com/assets/7239657/1015704...
摘要:解决办法如下测试表格我们从引入,首先对文件名进行编码,然后中作为的参数,这时候能成功下载文件,但是文件名是编码后的名字,要解码的话,我们需要在里面声明编码格式,即这样的话,对文件名进行解码,我们的文件名就是中文了。 在写 flask 后端的时候,特别是在做数据相关的操作的时候,产品往往需要我们做一个导出数据的需求,一般都是导出 excel 格式的文件。 那在 flask 上,如何实现请...
摘要:本文将介绍如何使用和抓取主流的技术博客文章,然后用搭建一个小型的技术文章聚合平台。是谷歌开源的基于和的自动化测试工具,可以很方便的让程序模拟用户的操作,对浏览器进行程序化控制。相对于,是新的开源项目,而且是谷歌开发,可以使用很多新的特性。 背景 说到爬虫,大多数程序员想到的是scrapy这样受人欢迎的框架。scrapy的确不错,而且有很强大的生态圈,有gerapy等优秀的可视化界面。但...
阅读 3695·2021-10-15 09:42
阅读 2548·2021-09-03 10:50
阅读 1572·2021-09-03 10:28
阅读 1751·2019-08-30 15:54
阅读 2461·2019-08-30 12:46
阅读 372·2019-08-30 11:06
阅读 2764·2019-08-30 10:54
阅读 492·2019-08-29 12:59