摘要:今天介绍一下如何在项目中使用搭建一个有两个节点的任务队列一个主节点一个子节点主节点发布任务,子节点收到任务并执行。
今天介绍一下如何在django项目中使用celery搭建一个有两个节点的任务队列(一个主节点一个子节点;主节点发布任务,子节点收到任务并执行。搭建3个或者以上的节点就类似了),使用到了celery,rabbitmq。这里不会多带带介绍celery和rabbitmq中的知识了。
1.项目基础环境:
两个ubuntu18.04虚拟机、python3.6.5、django2.0.4、celery3.1.26post2
2.主节点django项目结构:
3.settings.py中关于celery的配置:
import djcelery # 此处的Queue和Exchange都涉及到RabbitMQ中的概念,这里不做介绍 from kombu import Queue, Exchange djcelery.setup_loader() BROKER_URL = "amqp://test:test@192.168.43.6:5672/testhost" CELERY_RESULT_BACKEND = "amqp://test:test@192.168.43.6:5672/testhost" CELERY_TASK_RESULT_EXPIRES=3600 CELERY_TASK_SERIALIZER="json" CELERY_RESULT_SERIALIZER="json" # CELERY_ACCEPT_CONTENT = ["json", "pickle", "msgpack", "yaml"] CELERY_DEFAULT_EXCHANGE = "train" CELERY_DEFAULT_EXCHANGE_TYPE = "direct" CELERY_IMPORTS = ("proj.celery1.tasks", ) CELERY_QUEUES = ( Queue("train", routing_key="train"), Queue("predict", routing_key="predict"), )
4.celery.py中的配置:
# coding:utf8 from __future__ import absolute_import import os from celery import Celery from django.conf import settings # set the default Django settings module for the "celery" program. os.environ.setdefault("DJANGO_SETTINGS_MODULE", "proj.settings") app = Celery("proj") # Using a string here means the worker will not have to # pickle the object when using Windows. app.config_from_object("django.conf:settings") # app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) app.autodiscover_tasks(settings.INSTALLED_APPS) @app.task(bind=True) def debug_task(self): print("Request: {0!r}".format(self.request))
5.proj/init.py中的配置:
from __future__ import absolute_import from .celery import app as celery_app
6.celery1/tasks.py:(主节点中的任务不会执行,只执行子节点中的任务)
from __future__ import absolute_import from celery import task @task def do_train(x, y): return x + y
7.celery1/views.py:
from .tasks import do_train class Test1View(APIView): def get(self, request): try: # 这里的queue和routing_key也涉及到RabiitMQ中的知识 # 关键,在这里控制向哪个queue中发送任务,子节点通过这个执行对应queue中的任务 ret = do_train.apply_async(args=[4, 2], queue="train", routing_key="train") # 获取结果 data = ret.get() except Exception as e: return Response(dict(msg=str(e), code=10001)) return Response(dict(msg="OK", code=10000, data=data))
8.子节点目录结构:
9.子节点中celery1/celery.py:
from __future__ import absolute_import from celery import Celery CELERY_IMPORTS = ("celery1.tasks", ) app = Celery("myapp", # 此处涉及到RabbitMQ的知识,RabbitMQ是对应主节点上的 broker="amqp://test:test@192.168.43.6:5672/testhost", backend="amqp://test:test@192.168.43.6:5672/testhost", include=["celery1.tasks"]) app.config_from_object("celery1.config") if __name__ == "__main__": app.start()
10.子节点中celery1/config.py:
from __future__ import absolute_import from kombu import Queue,Exchange from datetime import timedelta CELERY_TASK_RESULT_EXPIRES=3600 CELERY_TASK_SERIALIZER="json" CELERY_RESULT_SERIALIZER="json" CELERY_ACCEPT_CONTENT = ["json","pickle","msgpack","yaml"] CELERY_DEFAULT_EXCHANGE = "train" # exchange type可以看RabbitMQ中的相关内容 CELERY_DEFAULT_EXCHANGE_TYPE = "direct" CELERT_QUEUES = ( Queue("train",exchange="train",routing_key="train"), )
11.子节点celery1/tasks.py:(这个是要真正执行的task,每个节点可以不同)
from __future__ import absolute_import from celery1.celery import app import time from celery import task @task def do_train(x, y): """ 训练 :param data: :return: """ time.sleep(3) return dict(data=str(x+y),msg="train")
12.启动子节点中的celery:
celery1是项目,-Q train表示从train这个queue中接收任务
celery -A celery1 worker -l info -Q train
13.启动主节点中的django项目:
python manage.py runserver
14.使用Postman请求对应的view
请求url:http://127.0.0.1:8000/api/v1/celery1/test/ 返回的结果是: { "msg": "OK", "code": 10000, "data": { "data": "6", "msg": "train" } }
15.遇到的问题:
1)celery队列报错: AttributeError: ‘str’ object has no attribute ‘items’
解决:将redis库从3.0回退到了2.10,pip install redis==2.10
解决方法参考链接:https://stackoverflow.com/que...
今天就说到这里,如有疑问,欢迎交流。
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/42714.html
摘要:介绍应用举例是一个基于开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理,如果你的业务场景中需要用到异步任务,就可以考虑使用你想对台机器执行一条批量命令,可能会花很长时间,但你不想让你的程序等着结果返回, celery 1.celery介绍 1.1 celery应用举例 Celery 是一个 基于python开发的分布式异步消息任务队列,通过...
摘要:使用消息来通信,流程为客户端添加消息到队列来初始化一个任务,然后消息队列系统把消息分发给工作进程。可以包含多个工作进程和消息系统,来保证高可用性和进行水平扩展。保存结果可以使用很多例如的,,。 celery是一个简单的、灵活的、可靠的分布式系统,提供了工具来维护这样一个系统,用于处理大量的信息(实时信息、定时任务安排),是一个任务队列,易于使用,易于和其他语言进行配合。 任务队列 任务...
摘要:文档中文文档官方文档定时服务与结合使用简介是一个自带电池的的任务队列。追踪任务在不同状态间的迁移,并检视返回值。 文档 中文文档 官方文档 celery定时服务、celery与django结合使用 简介 Celery 是一个自带电池的的任务队列。它易于使用,所以你可以无视其所解决问题的复杂程度而轻松入门。它遵照最佳实践设计,所以你的产品可以扩展,或与其他语言集成,并且它自带了在生产...
小编写这篇文章的主要目的,主要是给大家去进行讲解Django项目实例情况,包括celery的一些具体使用情况介绍,学习这些的话,对我们的工作和生活帮助还是很大的,但是怎么样才能够更快的进行上手呢?下面就一个具体实例给大家进行解答。 1、django应用Celery django框架请求/响应的过程是同步的,框架本身无法实现异步响应。 但是我们在项目过程中会经常会遇到一些耗时的任务,比如:...
阅读 3692·2023-04-25 17:45
阅读 3349·2021-09-04 16:40
阅读 982·2019-08-30 13:54
阅读 2103·2019-08-29 12:59
阅读 1375·2019-08-26 12:11
阅读 3257·2019-08-23 15:17
阅读 1490·2019-08-23 12:07
阅读 3851·2019-08-22 18:00