摘要:是分布式任务队列,能实时处理任务,同时支持官方文档工作原理如下发送给从中消费消息,并将结果存储在中本文中使用的是,使用的是现在有两个,分别是加法运算和乘法运算。假定乘法运算的事件优先级高事件也很多,对于加法运算,要求每分钟最多处理个事件。
Celery是分布式任务队列,能实时处理任务, 同时支持task scheduling. 官方文档
Celery工作原理如下:
celery client发送message给broker
worker 从broker中消费消息,并将结果存储在result_end中
本文中使用的broker是Rabbit MQ,result_end使用的是Redis.
Scenario现在有两个task,分别是加法运算和乘法运算。假定乘法运算的事件优先级高&事件也很多,对于加法运算,要求每分钟最多处理10个事件。
框架Celery Worker:
在2 台server上部署worker,其中:
server1上的worker处理queue priority_low和priority_high上的事件
server2上的worker只处理priority_high上的事件
Celery Client:在应用中调用
Rabbit MQ:在server3上启动
Redis:在localhost启动
Code tasks.py & callback对两个任务加上callback的处理,如果成功,打印“----[task_id] is done”
from celery import Celery from kombu import Queue import time app = Celery("tasks", backend="redis://127.0.0.1:6379/6") app.config_from_object("celeryconfig") class CallbackTask(Task): def on_success(self, retval, task_id, args, kwargs): print "----%s is done" % task_id def on_failure(self, exc, task_id, args, kwargs, einfo): pass @app.task(base=CallbackTask) def add(x, y): return x + y @app.task(base=CallbackTask) def multiply(x,y): return x * yceleryconfig.py
from kombu import Queue from kombu import Exchange result_serializer = "json" broker_url = "amqp://guest:guest@192.168.xx.xxx:5672/%2f" task_queues = ( Queue("priority_low", exchange=Exchange("priority", type="direct"), routing_key="priority_low"), Queue("priority_high", exchange=Exchange("priority", type="direct"), routing_key="priority_high"), ) task_routes = ([ ("tasks.add", {"queue": "priority_low"}), ("tasks.multiply", {"queue": "priority_high"}), ],) task_annotations = { "tasks.add": {"rate_limit": "10/m"} }Celery Server and Client Worker on Server1
消费priority_high事件
celery -A tasks worker -Q priority_high --concurrency=4 -l info -E -n worker1@%hWorker on Server2
消费priority_high和priority_low事件
celery -A tasks worker -Q priority_high,priority_low --concurrency=4 -l info -E -n worker2@%hClient
生产者,pushlish 事件到broker
from tasks import add from tasks import multiply for i in xrange(50): add.delay(2, 2) multiply.delay(10,10)监控 install
pip install flower启动flower
假设在server2上启动flower,flower默认的端口是5555.
celery flower --broker=amqp://guest:guest@192.168.xx.xxx:5672//监控界面
在浏览器上输入 http://server2_ip:5555, 可以看到如下界面:
从queued tasks途中,可以看出 priority_high中的task先消费完,和预期是一样的。
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/38606.html
摘要:结论执行完任务不释放内存与原一直没有被销毁有关,因此可以适当配置小点,而任务并发数与配置项有关,每增加一个必然增加内存消耗,同时也影响到一个何时被销毁,因为是均匀调度任务至每个,因此也不宜配置过大,适当配置。 1.实际使用 监控task的执行结果:任务id,结果,traceback,children,任务状态 配置 backend=redis://127...
摘要:本文将介绍如何使用和抓取主流的技术博客文章,然后用搭建一个小型的技术文章聚合平台。是谷歌开源的基于和的自动化测试工具,可以很方便的让程序模拟用户的操作,对浏览器进行程序化控制。相对于,是新的开源项目,而且是谷歌开发,可以使用很多新的特性。 背景 说到爬虫,大多数程序员想到的是scrapy这样受人欢迎的框架。scrapy的确不错,而且有很强大的生态圈,有gerapy等优秀的可视化界面。但...
摘要:本文将介绍如何使用和抓取主流的技术博客文章,然后用搭建一个小型的技术文章聚合平台。是谷歌开源的基于和的自动化测试工具,可以很方便的让程序模拟用户的操作,对浏览器进行程序化控制。相对于,是新的开源项目,而且是谷歌开发,可以使用很多新的特性。 背景 说到爬虫,大多数程序员想到的是scrapy这样受人欢迎的框架。scrapy的确不错,而且有很强大的生态圈,有gerapy等优秀的可视化界面。但...
阅读 1470·2021-11-24 09:39
阅读 1785·2021-11-22 15:25
阅读 3738·2021-11-19 09:40
阅读 3298·2021-09-22 15:31
阅读 1297·2021-07-29 13:49
阅读 1207·2019-08-26 11:59
阅读 1319·2019-08-26 11:39
阅读 933·2019-08-26 11:00