资讯专栏INFORMATION COLUMN

Django中celery的使用项目实例

89542767 / 559人阅读

  小编写这篇文章的主要目的,主要是给大家去进行讲解Django项目实例情况,包括celery的一些具体使用情况介绍,学习这些的话,对我们的工作和生活帮助还是很大的,但是怎么样才能够更快的进行上手呢?下面就一个具体实例给大家进行解答。


  1、django应用Celery


  django框架请求/响应的过程是同步的,框架本身无法实现异步响应。


  但是我们在项目过程中会经常会遇到一些耗时的任务,比如:发送邮件、发送短信、大数据统计等等,这些操作耗时长,同步执行对用户体验非常不友好,那么在这种情况下就需要实现异步执行。


  异步执行前端一般使用ajax,后端使用Celery。


  2、项目应用


  django项目应用celery,主要有两种任务方式,一是异步任务(发布者任务),一般是web请求,二是定时任务。


  celery组成


  Celery是由Python开发、简单、灵活、可靠的分布式任务队列,是一个处理异步任务的框架,其本质是生产者消费者模型,生产者发送任务到消息队列,消费者负责处理任务。Celery侧重于实时操作,但对调度支持也很好,其每天可以处理数以百万计的任务。特点:


  简单:熟悉celery的工作流程后,配置使用简单


  高可用:当任务执行失败或执行过程中发生连接中断,celery会自动尝试重新执行任务


  快速:一个单进程的celery每分钟可处理上百万个任务


  灵活:几乎celery的各个组件都可以被扩展及自定制


  Celery由三部分构成:


  消息中间件(Broker):官方提供了很多备选方案,支持RabbitMQ、Redis、Amazon SQS、MongoDB、Memcached等,官方推荐RabbitMQ


  任务执行单元(Worker):任务执行单元,负责从消息队列中取出任务执行,它可以启动一个或者多个,也可以启动在不同的机器节点,这就是其实现分布式的核心


  结果存储(Backend):官方提供了诸多的存储方式支持:RabbitMQ、Redis、Memcached,SQLAlchemy,Django ORM、Apache Cassandra、Elasticsearch等


  架构如下:

01.png

  工作原理:


  任务模块Task包含异步任务和定时任务。其中,异步任务通常在业务逻辑中被触发并发往消息队列,而定时任务由Celery Beat进程周期性地将任务发往消息队列;


  任务执行单元Worker实时监视消息队列获取队列中的任务执行;


  Woker执行完任务后将结果保存在Backend中;


  本文使用的是redis数据库作为消息中间件和结果存储数据库


  1.异步任务redis


  1.安装库


  pip install celery
  pip install redis


  2.celery.py


  在主项目目录下,新建celery.py文件:


  import os
  import django
  from celery import Celery
  from django.conf import settings
  #设置系统环境变量,安装django,必须设置,否则在启动celery时会报错
  #celery_study是当前项目名
  os.environ.setdefault('DJANGO_SETTINGS_MODULE','celery_study.settings')
  django.setup()
  celery_app=Celery('celery_study')
  celery_app.config_from_object('django.conf:settings')
  celery_app.autodiscover_tasks(lambda:settings.INSTALLED_APPS)

 02.png

    注意:是和settings.py文件同目录,一定不能建立在项目根目录,不然会引起celery这个模块名的命名冲突


  同时,在主项目的init.py中,添加如下代码:


  from.celery import celery_app
  __all__=['celery_app']

03.png

  3.settings.py


  在配置文件中配置对应的redis配置:


  #Broker配置,使用Redis作为消息中间件
  BROKER_URL='redis://127.0.0.1:6379/0'
  #BACKEND配置,这里使用redis
  CELERY_RESULT_BACKEND='redis://127.0.0.1:6379/0'
  #结果序列化方案
  CELERY_RESULT_SERIALIZER='json'
  #任务结果过期时间,秒
  CELERY_TASK_RESULT_EXPIRES=60*60*24
  #时区配置
  CELERY_TIMEZONE='Asia/Shanghai'
  #指定导入的任务模块,可以指定多个
  #CELERY_IMPORTS=(
  #'other_dir.tasks',
  #)

04.png

  注意:所有配置的官方文档:Configuration and defaults—Celery 5.2.0b3 documentation


  4.tasks.py


  在子应用下建立各自对应的任务文件tasks.py(必须是tasks.py这个名字,不允许修改)


  from celery import shared_task
  shared_task
  def add(x,y):
  return x+y
  shared_task
  def mul(x,y):
  return x*y
  shared_task
  def xsum(numbers):
  return sum(numbers)

05.png

  5.调用任务


  from.tasks import*
  #Create your views here.
  def task_add_view(request):
  add.delay(100,200)
  return HttpResponse(f'调用函数结果')

06.png

  6.启动celery


  pip install eventlet
  celery-A celery_study worker-l debug-P eventlet

  注意:celery_study是项目名


  使用redis时,有可能会出现如下类似的异常

  AttributeError:'str'object has no attribute'items'


  这是由于版本差异,需要卸载已经安装的python环境中的redis库,重新指定安装特定版本(celery4.x以下适用redis2.10.6,celery4.3以上使用redis3.2.0以上):

  xxxxxxxxxx pip install redis==2.10.6


  7.获取任务结果


  在views.py中,通过AsyncResult.get()获取结果


  from celery import result
  def get_result_by_taskid(request):
  task_id=request.GET.get('task_id')
  #异步执行
  ar=result.AsyncResult(task_id)
  if ar.ready():
  return JsonResponse({'status':ar.state,'result':ar.get()})
  else:
  return JsonResponse({'status':ar.state,'result':''})

  

      AsyncResult类的常用的属性和方法:


  state:返回任务状态,等同status;


  task_id:返回任务id;


  result:返回任务结果,同get()方法;


  ready():判断任务是否执行以及有结果,有结果为True,否则False;


  info():获取任务信息,默认为结果;


  wait(t):等待t秒后获取结果,若任务执行完毕,则不等待直接获取结果,若任务在执行中,则wait期间一直阻塞,直到超时报错;


  successful():判断任务是否成功,成功为True,否则为False;


  2.定时任务


  在第一步的异步任务的基础上,进行部分修改即可


  1.settings.py


  from celery.schedules import crontab
  CELERYBEAT_SCHEDULE={
  'mul_every_30_seconds':{
  #任务路径
  'task':'celery_app.tasks.mul',
  #每30秒执行一次
  'schedule':5,
  'args':(14,5)
  }
  }

08.png

  说明(更多内容见文档:Periodic Tasks—Celery 5.2.0b3 documentation):


  task:任务函数


  schedule:执行频率,可以是整型(秒数),也可以是timedelta对象,也可以是crontab对象,也可以是自定义类(继承celery.schedules.schedule)


  args:位置参数,列表或元组


  kwargs:关键字参数,字典


  options:可选参数,字典,任何apply_async()支持的参数


  relative:默认是False,取相对于beat的开始时间;设置为True,则取设置的timedelta时间


  在task.py中设置了日志


  from celery import shared_task
  import logging
  logger=logging.getLogger(__name__))
  shared_task
  def mul(x,y):
  logger.info('___mul__'*10)
  return x*y


  2.启动celery


  (两个cmd)分别启动worker和beat


  celery-A worker celery_study-l debug-P eventlet
  celery beat-A celery_study-l debug


  3.任务绑定


  Celery可通过task绑定到实例获取到task的上下文,这样我们可以在task运行时候获取到task的状态,记录相关日志等


  方法:


  在装饰器中加入参数bind=True


  在task函数中的第一个参数设置为self


  在task.py里面写


  from celery import shared_task
  import logging
  logger=logging.getLogger(__name__)
  #任务绑定
  shared_task(bind=True)
  def add(self,x,y):
  logger.info('add__-----'*10)
  logger.info('name:',self.name)
  logger.info('dir(self)',dir(self))
  return x+y


  其中:self对象是celery.app.task.Task的实例,可以用于实现重试等多种功能


  from celery import shared_task
  import logging
  logger=logging.getLogger(__name__)
  #任务绑定
  shared_task(bind=True)
  def add(self,x,y):
  try:
  logger.info('add__-----'*10)
  logger.info('name:',self.name)
  logger.info('dir(self)',dir(self))
  raise Exception
  except Exception as e:
  #出错每4秒尝试一次,总共尝试4次
  self.retry(exc=e,countdown=4,max_retries=4)
  return x+y
  启动celery
  celery-A worker celery_study-l debug-P eventlet


  4.任务钩子


  Celery在执行任务时,提供了钩子方法用于在任务执行完成时候进行对应的操作,在Task源码中提供了很多状态钩子函数如:on_success(成功后执行)、on_failure(失败时候执行)、on_retry(任务重试时候执行)、after_return(任务返回时候执行)


  方法:通过继承Task类,重写对应方法即可,


  from celery import Task
  class MyHookTask(Task):
  def on_success(self,retval,task_id,args,kwargs):
  logger.info(f'task id:{task_id},arg:{args},successful!')
  def on_failure(self,exc,task_id,args,kwargs,einfo):
  logger.info(f'task id:{task_id},arg:{args},failed!erros:{exc}')
  def on_retry(self,exc,task_id,args,kwargs,einfo):
  logger.info(f'task id:{task_id},arg:{args},retry!erros:{exc}')
  #在对应的task函数的装饰器中,通过base=MyHookTask指定
  shared_task(base=MyHookTask,bind=True)
  def add(self,x,y):
  logger.info('add__-----'*10)
  logger.info('name:',self.name)
  logger.info('dir(self)',dir(self))
  return x+y


  启动celery

  celery-A worker celery_study-l debug-P eventlet


  5.任务编排


  在很多情况下,一个任务需要由多个子任务或者一个任务需要很多步骤才能完成,Celery也能实现这样的任务,完成这类型的任务通过以下模块完成:


  group:并行调度任务


  chain:链式任务调度


  chord:类似group,但分header和body2个部分,header可以是一个group任务,执行完成后调用body的任务


  map:映射调度,通过输入多个入参来多次调度同一个任务


  starmap:类似map,入参类似*args


  chunks:将任务按照一定数量进行分组


  文档:Next Steps—Celery 5.2.0b3 documentation


  1.group


  urls.py:

  path('primitive/',views.test_primitive),


  views.py:


  from.tasks import*
  from celery import group
  def test_primitive(request):


  #创建10个并列的任务


  lazy_group=group(add.s(i,i)for i in range(10))


  promise=lazy_group()


  result=promise.get()


  return JsonResponse({'function':'test_primitive','result':result})


  说明:


  通过task函数的s方法传入参数,启动任务


  上面这种方法需要进行等待,如果依然想实现异步的方式,那么就必须在tasks.py中新建一个task方法,调用group,示例如下:


  tasks.py:


  shared_task
  def group_task(num):
  return group(add.s(i,i)for i in range(num))().get()
  urls.py:
  path('first_group/',views.first_group),
  views.py:
  def first_group(request):
  ar=tasks.group_task.delay(10)
  return HttpResponse('返回first_group任务,task_id:'+ar.task_id)


  2.chain


  默认上一个任务的结果作为下一个任务的第一个参数


  def test_primitive(request):
  #等同调用mul(add(add(2,2),5),8)
  promise=chain(tasks.add.s(2,2),tasks.add.s(5),tasks.mul.s(8))()
  #72
  result=promise.get()
  return JsonResponse({'function':'test_primitive','result':result})

  3.chord


  任务分割,分为header和body两部分,hearder任务执行完在执行body,其中hearder返回结果作为参数传递给body


  def test_primitive(request):
  #header:[3,12]
  #body:xsum([3,12])
  promise=chord(header=[tasks.add.s(1,2),tasks.mul.s(3,4)],body=tasks.xsum.s())()
  result=promise.get()
  return JsonResponse({'function':'test_primitive','result':result})


  6、celery管理和监控


  celery通过flower组件实现管理和监控功能,flower组件不仅仅提供监控功能,还提供HTTP API可实现对woker和task的管理


  官网:flower·PyPI


  文档:Flower-Celery monitoring tool—Flower 1.0.1 documentation


  安装flower

  pip install flower


  启动flower

  flower-A celery_study--port=5555


  说明:


  -A:项目名


  --port:端口号


  访问


  在浏览器输入:http://127.0.0.1:5555


  通过api操作

  curl http://127.0.0.1:5555/api/workers


  到此为止,这篇文章就给大家介绍完毕了,希望能给大家带来帮助。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/128278.html

相关文章

  • Celery实际使用与内存泄漏问题(面试)

    摘要:结论执行完任务不释放内存与原一直没有被销毁有关,因此可以适当配置小点,而任务并发数与配置项有关,每增加一个必然增加内存消耗,同时也影响到一个何时被销毁,因为是均匀调度任务至每个,因此也不宜配置过大,适当配置。 1.实际使用 ​ 监控task的执行结果:任务id,结果,traceback,children,任务状态 ​ 配置 backend=redis://127...

    0x584a 评论0 收藏0
  • django开发-使用celery搭建分布式(多节点)任务队列

    摘要:今天介绍一下如何在项目中使用搭建一个有两个节点的任务队列一个主节点一个子节点主节点发布任务,子节点收到任务并执行。 今天介绍一下如何在django项目中使用celery搭建一个有两个节点的任务队列(一个主节点一个子节点;主节点发布任务,子节点收到任务并执行。搭建3个或者以上的节点就类似了),使用到了celery,rabbitmq。这里不会单独介绍celery和rabbitmq中的知识了...

    ConardLi 评论0 收藏0
  • django使用celery

    摘要:前言针对高延时任务直接在一次网络请求中处理完毕会导致很不好的体验则可以不阻塞请求后台处理这些任务并且可以使用的进行数据库操作环境其他创建工程此时项目结构如下修改添加修改创建新创 前言: 针对高延时任务, 直接在一次网络请求中处理完毕会导致很不好的体验, celery则可以不阻塞请求后台处理这些任务, 并且可以使用django的models进行数据库操作. 环境 python model...

    meislzhua 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<