资讯专栏INFORMATION COLUMN

Flask+Celery+Redis实现队列化异步任务

Ali_ / 1844人阅读

摘要:使用异步框架,例如等等,装饰异步任务。它是一个专注于实时处理的任务队列,同时也支持任务调度。不存储任务状态。标识要使用的默认序列化方法的字符串。指定该任务的结果存储后端用于此任务。

概述:

        我们考虑一个场景,公司有一个需求,现在需要做一套web系统,而这套系统某些功能需要使用一些开源工具的sdk和api,或是运行一些耗时比较大的任务(单个大任务下可能有多个小任务),需要一段时间才能提供执行结果,而前端同事要求不能让用户在页面等待,需要马上提供一个返回结果给他,任务执行完后可以拿到最终结果,并且用户退出web界面或浏览器异常关闭之后,再次返回界面,执行的过程不会中断,并且支持多用户同时执行不同操作的需要

        很明显,这是一个-异步多线程-的场景,在Python中可以想到的有:

        1.引入Asyncio模块,利用多协程实现。

        2.使用Threading模块,自己编写线程任务,线程等待,睡眠,释放线程的过程。

        3.使用异步框架,例如Cerely、Tornado、Twisted等等,装饰异步任务。

        这里边最便捷且开发效率最高的应该是使用异步框架,咱们选择使用Celery来实现这个需求。

Celery介绍:

        截图与描述来自celery官网:Celery - Distributed Task Queue — Celery 5.2.0 documentation

        Celery 是一个简单、灵活且可靠的分布式系统,用于处理大量消息,同时为操作提供维护此类系统所需的工具。

        它是一个专注于实时处理的任务队列,同时也支持任务调度。

        Celery 拥有庞大而多样化的用户和贡献者社区,您应该加入我们的 IRC 或我们的邮件列表

        Celery 是开源的,并在BSD 许可下获得许可

消费者与消费结果:

        我们除了需要Celery做异步任务的处理,还需要一个中间件来充当消费者,并保存最终的任务处理结果(消费结果),这里有很多中间件可以选,例如常用的消息中间件,rabbitmq,kafka等,还可以使用mysql,redis等作为消费者并保存消费结果(因为最终的处理结果要返回给前端同事),楼主最终选择了redis。

Redis安装与配置:

        这里不再赘述windows下安装redis步骤,只介绍linux下安装redis与配置,我的机器是centos7.6:

        yum方式安装(注意:这样安装的redis不是最新版本的,如有对版本要求比较高的,建议去官网下载源码包去手动安装,官网地址:Redis,最新版本:6.2.6)

yum -y install redis

        安装完成之后配置redis.conf文件:

vi /etc/redis.conf

        修改这一行,改成 0.0.0.0,这样别的应用和组件才可以访问到redis的服务与端口:

        同理,redis的默认端口也可以在此配置里修改:

        还有一些关闭匿名访问,设置密码等配置的修改,项目若要上到公网环境下,建议配置。

        启动并测试redis服务功能是否正常:

        启动redis:        

redis-cli -h 0.0.0.0

        测试redis:

1 redis> set name "zzz"2 3 OK4 5 redis> get name6 7 "zzz"

        记住,代码并没有实际引用redis,但也需要安装redis模块,否则会报错。(redis模块版本不要太高,高了也会报错,这些坑都是楼主亲自趟过的,我这里使用2.10.6)

pip install redis==2.10.6

 Celery的安装和配置:

        windos和linux下都可以使用pip安装:

 pip install celery==3.1.25

        我的项目目录:(celeryconfig.py与__init__.py文件为celery与redis配置文件):

          

        在项目中先创建一个名为config的python目录,并在__init__.py中导入celery模块并配置:

__init__.py:

from celery import Celery,platformsplatforms.C_FORCE_ROOT = Trueapp = Celery("prod")  # 创建 Celery 实例app.config_from_object("kernel.config.celeryconfig")  # 通过 Celery 实例加载配置模块

        platforms.C_FORCE_ROOT = True 这个配置一定要有,否则会报权限问题

        在config目录下的celeryconfig.py中配置任务队列消费者与消费结果保存在redis的地址:

celeryconfig.py:

## celery配置BROKER_URL = "redis://redis-host:6379/1"  # 指定 Broker消费者,我们使用redis 1号数据库CELERY_RESULT_BACKEND = "redis://redis-host:6379/2"  # 指定 Backend,最终消费结果,我们使用redis 2号数据库CELERY_TIMEZONE = "Asia/Shanghai"  # 指定时区,默认是 UTCCELERY_IMPORTS = (  # 指定导入的任务模块    "kernel.views.api"   ## 异步任务代码文件路径即可)

        至此,前期需要的工具准备工作全部完毕,我们开始我们的开发任务。

异步任务开发:

        楼主因为主要负责后端这块,这里选择使用flask来写,整体的项目模块与版本,大概罗列下:

                        Python 3.5.4
                        Mysql  5.5.64        
                        Celery==3.1.25
                        Flask==1.1.4
                        Redis==2.10.6

        这时我们与前端同事再次详细沟通了下,初步约定如下:

        1.前端通过form表单传数据给后端,格式为json,分析:需要解析json数据。

        2.因为存在长耗时的任务,要求一旦前端请求过来,后端要马上返回一个中间结果给前端(这样解决了前端页面等待的问题),分析:需要马上提供一个返回结果。

        3.前端最终要拿到任务的最终执行结果,分析:我们需要把长耗时异步任务的最终结果推送给前端,需要任务代码最后推送执行结果。(自己先定义回调接口去测试)

1.后端Flask接口代码:

文件名称与路径:

        项目名称-kernel-view-api.py,与celery配置下的任务模块对应。

 api.py:

# -*- coding: utf-8 -*-import json, sysimport loggingimport requestsimport datetime,pymysqlimport os,subprocessfrom flask import render_template, Blueprint, request, g, abort, url_for, jsonify, session, redirect,Responsefrom kernel.models.playbook import PlayBook_filefrom kernel.utils import render_response, Retvalfrom kernel.models import dbfrom sqlalchemy import or_,textimport gitlab  ## 导入gitlab模块from kernel.config import app, cmdb_config,hcacp_configimport pymysql,uuid,hashlib,timefrom datetime import timezonebp = Blueprint("test", __name__)  ## 蓝图自己定义,这里只是实例化log = logging.getLogger(__name__)    ## 日志自己定义,这里只是实例化class status:  ## 定义一些状态码    success = 0    warning = 1    pending = 2    faild = -1## 回调接口@bp.route("/test/callback/", methods=["GET", "POST"])def ansible_aaa():    data1 = request.get_data(as_text=True)    # data2 = json.loads(data1)    log.info(data1)    return data1@bp.route("/test/add/", methods=["POST", "GET"])def devops_add():    """        获取form表单json数据    """    # return True    try:        data = request.get_data()        _data = json.loads((str(data, "utf-8")))        print(_data)    except Exception as requestdata_except:        log.error("获取表单数据异常,异常原因:%s" % requestdata_except)        return render_response(status.faild, u"获取表单数据异常,异常原因:%s" % requestdata_except, {})        ## 获取标识tag的结果    try:        """        工单json数据要带工单标识符select_tag:        create_project:新建项目申请工单        """        select_tag = _data.get("select_tag")    except Exception as request_select_tag_except:        log.error("获取表单需求标识select_tag异常,异常原因:%s" % request_select_tag_except)        return render_response(status.faild, u"获取表单需求标识select_tag异常,异常原因:%s" % request_select_tag_except, {})    try:        """             !--当参数select_tag == create_project 时,建立项目--!         """        if select_tag == "create_project":            projname = _data.get("projname")            add_project_result = add_project.delay(cmdb_config, _data)            return render_response(status.pending, u"devops系统添加项目工单任务执行中--pending--", {"项目中文名称": projname})    except Exception as do_celery_job_except:        log.error("执行异步celery任务异常,异常原因:%s" % do_celery_job_except)        return render_response(status.faild, u"执行异步celery任务异常,异常原因:%s" % do_celery_job_except, {})

这里代表前端请求过来之后,马上返回一个执行结果,满足需求2:

在devops_add接口里执行异步任务:        

        add_project_result = add_project.delay(cmdb_config, _data)

官网的示例:

        ## 1.扩号里为异步任务所需的参数

        ## 2.add_project_result 是异步任务执行的对象,包含很多属性方法,下边介绍一些常用的:

        获取任务结果和状态:
        add_project_result = task.apply_async()
        add_project_result.ready()     # 查看任务状态,返回布尔值,  任务执行完成, 返回 True, 否则返回 False.
        add_project_result.wait()      # 会阻塞等待任务完成, 返回任务执行结果,很少使用;
        add_project_result.get(timeout=1)       # 获取任务执行结果,可以设置等待时间,如果超时但任务未完成返回None;
        add_project_result.result      # 任务执行结果,未完成返回None;
        add_project_result.state       # PENDING, START, SUCCESS,任务当前的状态
        add_project_result.status      # PENDING, START, SUCCESS,任务当前的状态
        add_project_result.successful  # 任务成功返回true
        add_project_result.traceback  # 如果任务抛出了一个异常,可以获取原始的回溯信息

     

2.异步任务代码:

文件名称与路径:

        项目名称-kernel-view-api.py

api.py

解释:

        因为要满足需求3,把最终异步耗时任务的真正结果给到前端,所以我们需要在异步任务里写一个回调的操作。

         header = {"Content-Type": "application/json"}  ## 构造请求头和数据类型
        _json = {"status": sttaus.faild, "msg": u"失败", "data": {}}  ## 失败就返回给前端json类型失败

        _json = {"status": sttaus.success, "msg": u"成功", "data": {}}  ## 成功就返回给前端json类型成功

        requests.post(callback_url, headers=header, data=json.dumps(_json)) ## 带参回调请求

# -*- coding: utf-8 -*-import json, sysimport loggingimport requestsimport datetime,pymysqlimport os,subprocessfrom flask import render_template, Blueprint, request, g, abort, url_for, jsonify, session, redirect,Responsefrom kernel.utils import render_response, Retvalfrom datetime import timezonefrom kernel.config import *  ## 导入config目录下的celery配置bp = Blueprint("test", __name__)  ## 蓝图自己定义,这里只是实例化log = logging.getLogger(__name__)    ## 日志自己定义,这里只是实例化class status:  ## 定义一些状态码    success = 0    warning = 1    pending = 2    faild = -1## 示例函数:一个添加信息函数,前端给我们json数据,后端接受之后去插入数据库,完成操作并告诉前端@app.task  ## celery添加项目任务def add_project(mysql_config, _data):    try:        ## 系统添加项目信息工单        projname = _data.get("projname")  ## 项目名称,必填        prodesc= _data.get("prodesc")  ## 项目描述,必填        projctime = datetime.datetime.now()  ## 项目发布时间        callback_url = _data.get("callback_url")  ## 回调接口地址    except Exception as describe_form_except:        log.error("解析表单数据出现异常,异常原因:%s" % describe_form_except)        header = {"Content-Type": "application/json"}  ## 回调接口请求头        _json = {"status": status.faild, "msg": u"失败", "data": {}}        requests.post(callback_url, headers=header, data=json.dumps(_json))    try:        # 获取数据库连接        conn = pymysql.connect(cmdb_config.server, cmdb_config.user, cmdb_config.password, database=cmdb_config.db)        # 返回连接        cursor = conn.cursor()    except Exception as connect_except:        log.error("系统数据库连接出现异常,异常原因:%s" % connect_except)        _json = {"status": status.faild, "msg": u"失败", "data": {}}        requests.post(callback_url, headers=header, data=json.dumps(_json))    try:        proj_sql = "insert into project_tb_project (projname,prodesc,projctime) VALUES ("{}","{}","{}");".format(projname, prodesc, projctime)        cursor.execute(proj_sql)        conn.commit()        _json = {"status": status.success, "msg": u"成功", "data": {}}        requests.post(callback_url, headers=header, data=json.dumps(_json))        ## 任务执行完成之后调用回调接口,返回任务执行成功结果        log.info("系统建项目工单执行成功,%s" % proj_sql)    except Exception as do_add_project_except:        _json = {"status": status.faild, "msg": u"失败", "data": {}}        requests.post(callback_url, headers=header, data=json.dumps(_json))        log.error("执行添加项目工单异常,异常原因:%s" % do_add_project_except)        ## 任务执行完成之后调用回调接口,返回任务执行失败结果

        楼主用的最简单,没有在task里写一些属性,类似下边的这种方式还可以给task添加一些属性:

        @app.task(name="test",bind=True,base=BaseTask)

       补充介绍下异步task有的一些属性:

        TASK的一般属性:
        Task.name:任务名称;
        Task.request:当前任务的信息;
        Task.max_retries:设置重试的最大次数
        Task.throws:预期错误类的可选元组,不应被视为实际错误,而是结果失败;
        Task.rate_limit:设置此任务类型的速率限制
        Task.time_limit:此任务的硬限时(以秒为单位)。
        Task.ignore_result:不存储任务状态。默认False;
        Task.store_errors_even_if_ignored:如果True,即使任务配置为忽略结果,也会存储错误。
        Task.serializer:标识要使用的默认序列化方法的字符串。
        Task.compression:标识要使用的默认压缩方案的字符串。默认为task_compression设置。
        Task.backend:指定该任务的结果存储后端用于此任务。
        Task.acks_late:如果设置True为此任务的消息将在任务执行后确认 ,而不是在执行任务之前(默认行为),即默认任务执行之前就会发送确认;
        Task.track_started:如果True任务在工作人员执行任务时将其状态报告为“已启动”。默认是False;

我们启动celery来看下celery里在执行任务的过程中有什么变化

(1)启动项目:

楼主用的是gunicorn工具启动,配置多线程:

gunicorn.conf

        workers = 16   ## 多线程配置

        bind = "0.0.0.0:7777"

        proc_name = "websocket(项目名称)"

        limit_request_field_size = 0

        limit_request_line = 0

        log_level = "error"

        debug = True

        chdir = "/data/websocket" ## 项目目录

        启动命令:gunicorn -c  /项目目录/gunicorn.conf kernel:app

(2)启动celery:

        cd 到项目目录下,执行 celery -A kernel.views.api worker -l info  

(3)使用postman调用接口:

        可以看到直接先返回我们状态码2-等待状态:

(4)从日志看异步任务执行过程:

        1.会先在celery里出现一个异步任务,并生成一个异步任务的task-id号:

        2.redis去查看是否已有task任务,task-id号是一致的:

        用add_project_result保存异步任务执行结果的对象,最终的结果是在redis中,我们也可以去redis里去拿,redis保存的结果。

        我们用的redis 2号数据库,select 2 号数据库,keys * 查看redis是否已有任务

        任务最终的执行结果(celery日志里也可以看到,在redis里也可以看到,celery日志看的更直观,succeded代表异步任务执行成功):

        3. 查看项目日志,状态码为1,是回调接口打印出来的,代表返回给回调接口最终结果是成功。

        4.最终去数据库看下新添加记录是否已有,这里就不截图了,记录插入成功,异步任务执行成功,也满足了开始我们沟通的三个需求。

        5.前端同学给你竖起了大拇指,直呼你牛!

          

 

备注:

                ​​​​​​​        ​​​​​​​        ​​​​​​​        ​​​​​​​        ​​​​​​​        ​​​​​​​

 

 

        celery还可以用来做定时任务,感兴趣的伙伴们可以去官网或者其他途径去研究下,楼主第一次写这么大的博客,有些地方我描述不清楚的或者您没太看懂的可以私信我答疑解惑,我的微信zcw576020095,热爱python,热爱运维,一起加油!

        

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/123089.html

相关文章

  • 基于Flask-Angular的项目组网架构与部署

    摘要:基于网,分享项目的组网架构和部署。项目组网架构架构说明流项目访问分为两个流,通过分两个端口暴露给外部使用数据流用户访问网站。通过进行配置,使用作为异步队列来存储任务,并将处理结果存储在中。 基于Raindrop网,分享项目的组网架构和部署。 项目组网架构 showImg(https://cloud.githubusercontent.com/assets/7239657/1015704...

    kelvinlee 评论0 收藏0
  • Celery中使用Flask的上下文

    摘要:所以这就现实了在中使用的应用上下文。要引入请求上下文,需要考虑这两个问题如何在中产生请求上下文。中有和可以产生请求上下文。具体的思路还是在中重载类,通过,在的上下文环境下执行。将他们传入,生成伪造的请求上下文可以覆盖大多数的使用情况。 其实我只是想把邮件发送这个动作移到Celery中执行。既然用到了Celery,那么每次发邮件都单独开一个线程似乎有点多余,异步任务还是交给Celery吧...

    Sourcelink 评论0 收藏0
  • 基于Celery的分布式爬虫管理平台: Crawlab

    摘要:基于的爬虫分布式爬虫管理平台,支持多种编程语言以及多种爬虫框架。后台程序会自动发现这些爬虫项目并储存到数据库中。每一个节点需要启动应用来支持爬虫部署。任务将以环境变量的形式存在于爬虫任务运行的进程中,并以此来关联抓取数据。 Crawlab 基于Celery的爬虫分布式爬虫管理平台,支持多种编程语言以及多种爬虫框架。 Github: https://github.com/tikazyq/...

    legendaryedu 评论0 收藏0
  • 异步任务神器 Celery 简明笔记

    摘要:我们将窗口切换到的启动窗口,会看到多了两条日志这说明任务已经被调度并执行成功。本文标题为异步任务神器简明笔记本文链接为参考资料使用之美分布式任务队列的介绍思诚之道异步任务神器简明笔记 Celery 在程序的运行过程中,我们经常会碰到一些耗时耗资源的操作,为了避免它们阻塞主程序的运行,我们经常会采用多线程或异步任务。比如,在 Web 开发中,对新用户的注册,我们通常会给他发一封激活邮件,...

    Ryan_Li 评论0 收藏0
  • Django下使用celery 异步发送短信验证码

    摘要:介绍应用举例是一个基于开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理,如果你的业务场景中需要用到异步任务,就可以考虑使用你想对台机器执行一条批量命令,可能会花很长时间,但你不想让你的程序等着结果返回,  celery 1.celery介绍 1.1 celery应用举例 Celery 是一个 基于python开发的分布式异步消息任务队列,通过...

    everfly 评论0 收藏0

发表评论

0条评论

Ali_

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<