资讯专栏INFORMATION COLUMN

如何在多个queue多台server上部署Celery 以及任务状态监控flower

goji / 2230人阅读

摘要:是分布式任务队列,能实时处理任务,同时支持官方文档工作原理如下发送给从中消费消息,并将结果存储在中本文中使用的是,使用的是现在有两个,分别是加法运算和乘法运算。假定乘法运算的事件优先级高事件也很多,对于加法运算,要求每分钟最多处理个事件。

Celery是分布式任务队列,能实时处理任务, 同时支持task scheduling. 官方文档
Celery工作原理如下:

celery client发送message给broker

worker 从broker中消费消息,并将结果存储在result_end中

本文中使用的broker是Rabbit MQ,result_end使用的是Redis.

Scenario

现在有两个task,分别是加法运算和乘法运算。假定乘法运算的事件优先级高&事件也很多,对于加法运算,要求每分钟最多处理10个事件。

框架

Celery Worker:
在2 台server上部署worker,其中:
server1上的worker处理queue priority_low和priority_high上的事件
server2上的worker只处理priority_high上的事件

Celery Client:在应用中调用

Rabbit MQ:在server3上启动

Redis:在localhost启动

Code tasks.py & callback

对两个任务加上callback的处理,如果成功,打印“----[task_id] is done”

from celery import Celery
from kombu import Queue
import time


app = Celery("tasks", backend="redis://127.0.0.1:6379/6")
app.config_from_object("celeryconfig")


class CallbackTask(Task):
    def on_success(self, retval, task_id, args, kwargs):
        print "----%s is done" % task_id

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        pass

@app.task(base=CallbackTask) 
def add(x, y):
    return x + y


@app.task(base=CallbackTask) 
def multiply(x,y):
    return x * y
celeryconfig.py
from kombu import Queue
from kombu import Exchange

result_serializer = "json"


broker_url = "amqp://guest:guest@192.168.xx.xxx:5672/%2f"

task_queues = (
    Queue("priority_low",  exchange=Exchange("priority", type="direct"), routing_key="priority_low"),
    Queue("priority_high",  exchange=Exchange("priority", type="direct"), routing_key="priority_high"),
)

task_routes = ([
    ("tasks.add", {"queue": "priority_low"}),
    ("tasks.multiply", {"queue": "priority_high"}),
],)

task_annotations = {
    "tasks.add": {"rate_limit": "10/m"}
}
Celery Server and Client Worker on Server1

消费priority_high事件

celery -A tasks worker -Q priority_high --concurrency=4 -l info -E -n worker1@%h
Worker on Server2

消费priority_high和priority_low事件

celery -A tasks worker -Q priority_high,priority_low --concurrency=4  -l info -E -n worker2@%h
Client

生产者,pushlish 事件到broker

from tasks import add
from tasks import multiply


for i in xrange(50):
    add.delay(2, 2)
    multiply.delay(10,10)
监控 install
pip install flower
启动flower

假设在server2上启动flower,flower默认的端口是5555.

celery  flower --broker=amqp://guest:guest@192.168.xx.xxx:5672//
监控界面

在浏览器上输入 http://server2_ip:5555, 可以看到如下界面:
从queued tasks途中,可以看出 priority_high中的task先消费完,和预期是一样的。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/38606.html

相关文章

  • Celery实际使用与内存泄漏问题(面试)

    摘要:结论执行完任务不释放内存与原一直没有被销毁有关,因此可以适当配置小点,而任务并发数与配置项有关,每增加一个必然增加内存消耗,同时也影响到一个何时被销毁,因为是均匀调度任务至每个,因此也不宜配置过大,适当配置。 1.实际使用 ​ 监控task的执行结果:任务id,结果,traceback,children,任务状态 ​ 配置 backend=redis://127...

    0x584a 评论0 收藏0
  • 手把手教你如何用Crawlab构建技术文章聚合平台(一)

    摘要:本文将介绍如何使用和抓取主流的技术博客文章,然后用搭建一个小型的技术文章聚合平台。是谷歌开源的基于和的自动化测试工具,可以很方便的让程序模拟用户的操作,对浏览器进行程序化控制。相对于,是新的开源项目,而且是谷歌开发,可以使用很多新的特性。 背景 说到爬虫,大多数程序员想到的是scrapy这样受人欢迎的框架。scrapy的确不错,而且有很强大的生态圈,有gerapy等优秀的可视化界面。但...

    LinkedME2016 评论0 收藏0
  • 手把手教你如何用Crawlab构建技术文章聚合平台(一)

    摘要:本文将介绍如何使用和抓取主流的技术博客文章,然后用搭建一个小型的技术文章聚合平台。是谷歌开源的基于和的自动化测试工具,可以很方便的让程序模拟用户的操作,对浏览器进行程序化控制。相对于,是新的开源项目,而且是谷歌开发,可以使用很多新的特性。 背景 说到爬虫,大多数程序员想到的是scrapy这样受人欢迎的框架。scrapy的确不错,而且有很强大的生态圈,有gerapy等优秀的可视化界面。但...

    Jeffrrey 评论0 收藏0

发表评论

0条评论

goji

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<