资讯专栏INFORMATION COLUMN

django开发-使用celery搭建分布式(多节点)任务队列

ConardLi / 3634人阅读

摘要:今天介绍一下如何在项目中使用搭建一个有两个节点的任务队列一个主节点一个子节点主节点发布任务,子节点收到任务并执行。

今天介绍一下如何在django项目中使用celery搭建一个有两个节点的任务队列(一个主节点一个子节点;主节点发布任务,子节点收到任务并执行。搭建3个或者以上的节点就类似了),使用到了celery,rabbitmq。这里不会多带带介绍celery和rabbitmq中的知识了。

1.项目基础环境:
两个ubuntu18.04虚拟机、python3.6.5、django2.0.4、celery3.1.26post2

2.主节点django项目结构:

3.settings.py中关于celery的配置:

import djcelery
# 此处的Queue和Exchange都涉及到RabbitMQ中的概念,这里不做介绍
from kombu import Queue, Exchange
djcelery.setup_loader()
BROKER_URL = "amqp://test:test@192.168.43.6:5672/testhost"
CELERY_RESULT_BACKEND = "amqp://test:test@192.168.43.6:5672/testhost"

CELERY_TASK_RESULT_EXPIRES=3600
CELERY_TASK_SERIALIZER="json"
CELERY_RESULT_SERIALIZER="json"
# CELERY_ACCEPT_CONTENT = ["json", "pickle", "msgpack", "yaml"]

CELERY_DEFAULT_EXCHANGE = "train"
CELERY_DEFAULT_EXCHANGE_TYPE = "direct"

CELERY_IMPORTS = ("proj.celery1.tasks", )

CELERY_QUEUES = (
  Queue("train", routing_key="train"),
  Queue("predict", routing_key="predict"),
)

4.celery.py中的配置:

# coding:utf8
from __future__ import absolute_import

import os

from celery import Celery
from django.conf import settings

# set the default Django settings module for the "celery" program.

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "proj.settings")

app = Celery("proj")

# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object("django.conf:settings")
# app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
app.autodiscover_tasks(settings.INSTALLED_APPS)


@app.task(bind=True)
def debug_task(self):
    print("Request: {0!r}".format(self.request))

5.proj/init.py中的配置:

from __future__ import absolute_import
from .celery import app as celery_app

6.celery1/tasks.py:(主节点中的任务不会执行,只执行子节点中的任务)

from __future__ import absolute_import
from celery import task


@task
def do_train(x, y):
    return x + y

7.celery1/views.py:

from .tasks import do_train
class Test1View(APIView):
    def get(self, request):
        try:
            # 这里的queue和routing_key也涉及到RabiitMQ中的知识
            # 关键,在这里控制向哪个queue中发送任务,子节点通过这个执行对应queue中的任务
            ret = do_train.apply_async(args=[4, 2], queue="train", routing_key="train")
            # 获取结果
            data = ret.get()
        except Exception as e:
            return Response(dict(msg=str(e), code=10001))
        return Response(dict(msg="OK", code=10000, data=data))

8.子节点目录结构:

9.子节点中celery1/celery.py:

from __future__ import absolute_import
from celery import Celery
CELERY_IMPORTS = ("celery1.tasks", )
app = Celery("myapp",
             # 此处涉及到RabbitMQ的知识,RabbitMQ是对应主节点上的
             broker="amqp://test:test@192.168.43.6:5672/testhost",
             backend="amqp://test:test@192.168.43.6:5672/testhost",
             include=["celery1.tasks"])

app.config_from_object("celery1.config")

if __name__ == "__main__":
  app.start()

10.子节点中celery1/config.py:

from __future__ import absolute_import
from kombu import Queue,Exchange
from datetime import timedelta

CELERY_TASK_RESULT_EXPIRES=3600
CELERY_TASK_SERIALIZER="json"
CELERY_RESULT_SERIALIZER="json"
CELERY_ACCEPT_CONTENT = ["json","pickle","msgpack","yaml"]

CELERY_DEFAULT_EXCHANGE = "train"
# exchange type可以看RabbitMQ中的相关内容
CELERY_DEFAULT_EXCHANGE_TYPE = "direct"

CELERT_QUEUES =  (
  Queue("train",exchange="train",routing_key="train"),
)

11.子节点celery1/tasks.py:(这个是要真正执行的task,每个节点可以不同)

from __future__ import absolute_import
from celery1.celery import app


import time
from celery import task


@task
def do_train(x, y):
    """
    训练
    :param data:
    :return:
    """
    time.sleep(3)
    return dict(data=str(x+y),msg="train")

12.启动子节点中的celery:
celery1是项目,-Q train表示从train这个queue中接收任务

celery -A celery1 worker -l info -Q train

13.启动主节点中的django项目:

python manage.py runserver

14.使用Postman请求对应的view

请求url:http://127.0.0.1:8000/api/v1/celery1/test/
返回的结果是:
{
    "msg": "OK",
    "code": 10000,
    "data": {
        "data": "6",
        "msg": "train"
    }
}

15.遇到的问题:
1)celery队列报错: AttributeError: ‘str’ object has no attribute ‘items’
解决:将redis库从3.0回退到了2.10,pip install redis==2.10
解决方法参考链接:https://stackoverflow.com/que...

今天就说到这里,如有疑问,欢迎交流。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/42714.html

相关文章

  • Django使用celery 异步发送短信验证码

    摘要:介绍应用举例是一个基于开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理,如果你的业务场景中需要用到异步任务,就可以考虑使用你想对台机器执行一条批量命令,可能会花很长时间,但你不想让你的程序等着结果返回,  celery 1.celery介绍 1.1 celery应用举例 Celery 是一个 基于python开发的分布式异步消息任务队列,通过...

    everfly 评论0 收藏0
  • celery 使用

    摘要:使用消息来通信,流程为客户端添加消息到队列来初始化一个任务,然后消息队列系统把消息分发给工作进程。可以包含多个工作进程和消息系统,来保证高可用性和进行水平扩展。保存结果可以使用很多例如的,,。 celery是一个简单的、灵活的、可靠的分布式系统,提供了工具来维护这样一个系统,用于处理大量的信息(实时信息、定时任务安排),是一个任务队列,易于使用,易于和其他语言进行配合。 任务队列 任务...

    GraphQuery 评论0 收藏0
  • Celery任务队列

    摘要:文档中文文档官方文档定时服务与结合使用简介是一个自带电池的的任务队列。追踪任务在不同状态间的迁移,并检视返回值。 文档 中文文档 官方文档 celery定时服务、celery与django结合使用 简介 Celery 是一个自带电池的的任务队列。它易于使用,所以你可以无视其所解决问题的复杂程度而轻松入门。它遵照最佳实践设计,所以你的产品可以扩展,或与其他语言集成,并且它自带了在生产...

    Lorry_Lu 评论0 收藏0
  • Djangocelery使用项目实例

      小编写这篇文章的主要目的,主要是给大家去进行讲解Django项目实例情况,包括celery的一些具体使用情况介绍,学习这些的话,对我们的工作和生活帮助还是很大的,但是怎么样才能够更快的进行上手呢?下面就一个具体实例给大家进行解答。  1、django应用Celery  django框架请求/响应的过程是同步的,框架本身无法实现异步响应。  但是我们在项目过程中会经常会遇到一些耗时的任务,比如:...

    89542767 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<