资讯专栏INFORMATION COLUMN

airflow探索篇

leap_frog / 2164人阅读

摘要:调度和监控工作流的平台,用于用来创建监控和调整。安装以及方式启动重要说明使用需要安装配置说明上篇在中配置的。负责调度,只支持单节点,多节点启动可能会挂掉负责执行具体中的。轮询查询状态是成功失败。如是则继续轮询,成功失败操作相应后续操作。

airflow是一个 Airbnb 的 Workflow 开源项目,在Github 上已经有超过两千星。data pipeline调度和监控工作流的平台,用于用来创建、监控和调整data pipeline。类似的产品有:Azkaban、oozie 
pip方式安装

默认已经安装python >= 2.7 以及 pip
安装可以参考这篇,比较详细。airflow安装以及celery方式启动

重要说明 使用mysql需要安装
python 2 : pip install MySQL-python
python 3 : pip install PyMySQL
AIRFLOW_HOME配置说明

上篇在.bashrc中配置的export AIRFLOW_HOME=/home/airflow/airflow01。AIRFLOW_HOME设置目录在airflow initdb的时候初始化,存放airflow的配置文件airflow.cfg及相关文件。

DAG说明-管理建议

默认$AIRFLOW_HOME/dags存放定义的dag,可以分目录管理dag。常用管理dag做法,dag存放另一个目录通过git管理,并设置软连接映射到$AIRFLOW_HOME/dag。好处方便dag编辑变更,同时dag变更不会出现编辑到一半的时候就加载到airflow中。

plugins说明-算子定义

默认$AIRFLOW_HOME/plugins存放定义的plugins,自定义组件。可以自定义operator,hook等等。我们希望可以直接使用这种模式定义机器学习的一个算子。下面定义了一个简单的加法算子。

# -*- coding: UTF-8 -*-
# !/usr/bin/env python

from airflow.plugins_manager import AirflowPlugin
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults

# Will show up under airflow.operators.plus_plugin.PluginOperator
class PlusOperator(BaseOperator):

    @apply_defaults
    def __init__(self, op_args=None, params=None, provide_context=False, set_context=False, *args, **kwargs):
        super(PlusOperator, self).__init__(*args, **kwargs)
        self.params = params or {}
        self.set_context = set_context

    def execute(self, context):
        if self.provide_context:
            context.update(self.op_kwargs)
            self.op_kwargs = context

        puls = self.op_kwargs["a"] + self.op_kwargs["b"]
        print "a =", self.op_kwargs["a"], ". b=", self.op_kwargs["a"]
        return_value = self.main()
        context[self.task_id].xcom_push(key="return_value", value=return_value)
        return puls


# Defining the plugin class
class PlusPlugin(AirflowPlugin):
    name = "plus_plugin"
    operators = [PlusOperator]

在dag中使用案例如下

from airflow.operators.plus_plugin import PlusOperator
plus_task = PlusOperator(task_id="plus_task", provide_context=True, params={"a": 1,"b":2},dag=dag)
一些命令说明
命令 说明
airflow webserver -p 8091 8091启动webserver,通过页面查询不需要可以不启动
airflow scheduler 调度器,必须启动,不然dag没法run起来(使用CeleryExecutor、LocalExecutor时)
airflow run dagid [time] run task instance
airflow backfill [dagid] -s[startTime] -e [endTime] run a backfill over 2 days
run的demo
# run your first task instance
airflow run example_bash_operator runme_0 2018-01-11

# run a backfill over 2 days
airflow backfill example_bash_operator -s 2018-01-10 -e 2018-01-11
基于CeleryExecutor方式的系统架构

使用celery方式的系统架构图(官方推荐使用这种方式,同时支持mesos方式部署)。turing为外部系统,GDags服务帮助拼接成dag,可以忽略。

1.master节点webui管理dags、日志等信息。scheduler负责调度,只支持单节点,多节点启动scheduler可能会挂掉

2.worker负责执行具体dag中的task。这样不同的task可以在不同的环境中执行。

基于LocalExecutor方式的系统架构图

另一种启动方式的思考,一个dag分配到1台机器上执行。如果task不复杂同时task环境相同,可以采用这种方式,方便扩容、管理,同时没有master单点问题。

基于源码的启动以及二次开发

很多情况airflow是不满足我们需求,就需要自己二次开发,这时候就需要基于源码方式启动。比如日志我们期望通过http的方式提供出来,同其他系统查看。airflow自动的webserver只提供页面查询的方式。

下载源码

github源码地址 : [https://github.com/apache/inc...]
git clone git@github.com:apache/incubator-airflow.git

切换分支

master分支的表初始化有坑,mysql设置的sql校验安全级别过高一直建表不成功。这个坑被整的有点惨。v1-8-stable或者v1-9-stable分支都可以。
git checkout v1-8-stable

安装必要Python包

进入incubator-airflow,python setup.py install (没啥文档说明,又是一个坑。找了半天)

初始化

直接输入airflow initdb(python setup.py install这个命令会将airflow安装进去)

修改配置

进入$AIRFLOE_HOME (默认在~/airflow),修改airflow.cfg,修改mysql配置。可以查看上面推荐的文章以及上面的[使用mysql需要安装]

启动

airflow webserver -p 8085
airflow scheduler

获取日志信息的改造

1.进入incubator-airflow/airflow/www/
2.修改views.py
在 class Airflow(BaseView)中添加下面代码

@expose("/logs")
    @login_required
    @wwwutils.action_logging
    def logs(self):
        BASE_LOG_FOLDER = os.path.expanduser(
            conf.get("core", "BASE_LOG_FOLDER"))
        dag_id = request.args.get("dag_id")
        task_id = request.args.get("task_id")
        execution_date = request.args.get("execution_date")
        dag = dagbag.get_dag(dag_id)
        log_relative = "{dag_id}/{task_id}/{execution_date}".format(
            **locals())
        loc = os.path.join(BASE_LOG_FOLDER, log_relative)
        loc = loc.format(**locals())
        log = ""
        TI = models.TaskInstance
        session = Session()
        dttm = dateutil.parser.parse(execution_date)
        ti = session.query(TI).filter(
            TI.dag_id == dag_id, TI.task_id == task_id,
            TI.execution_date == dttm).first()
        dttm = dateutil.parser.parse(execution_date)
        form = DateTimeForm(data={"execution_date": dttm})

        if ti:
            host = ti.hostname
            log_loaded = False

            if os.path.exists(loc):
                try:
                    f = open(loc)
                    log += "".join(f.readlines())
                    f.close()
                    log_loaded = True
                except:
                    log = "*** Failed to load local log file: {0}.
".format(loc)
            else:
                WORKER_LOG_SERVER_PORT = 
                    conf.get("celery", "WORKER_LOG_SERVER_PORT")
                url = os.path.join(
                    "http://{host}:{WORKER_LOG_SERVER_PORT}/log", log_relative
                ).format(**locals())
                log += "*** Log file isn"t local.
"
                log += "*** Fetching here: {url}
".format(**locals())
                try:
                    import requests
                    timeout = None  # No timeout
                    try:
                        timeout = conf.getint("webserver", "log_fetch_timeout_sec")
                    except (AirflowConfigException, ValueError):
                        pass

                    response = requests.get(url, timeout=timeout)
                    response.raise_for_status()
                    log += "
" + response.text
                    log_loaded = True
                except:
                    log += "*** Failed to fetch log file from worker.
".format(
                        **locals())

            if not log_loaded:
                # load remote logs
                remote_log_base = conf.get("core", "REMOTE_BASE_LOG_FOLDER")
                remote_log = os.path.join(remote_log_base, log_relative)
                log += "
*** Reading remote logs...
"

                # S3
                if remote_log.startswith("s3:/"):
                    log += log_utils.S3Log().read(remote_log, return_error=True)

                # GCS
                elif remote_log.startswith("gs:/"):
                    log += log_utils.GCSLog().read(remote_log, return_error=True)

                # unsupported
                elif remote_log:
                    log += "*** Unsupported remote log location."

            session.commit()
            session.close()

        if PY2 and not isinstance(log, unicode):
            log = log.decode("utf-8")

        title = "Log"

        return wwwutils.json_response(log)

3.重启服务,访问url如:

http://localhost:8085/admin/airflow/logs?task_id=run_after_loop&dag_id=example_bash_operator&execution_date=2018-01-11

就可以拿到这个任务在execution_date=2018-01-11的日志

异步任务思考

案例:task通过http请求大数据操作,拆分一些数据,存入一些临时表。
方案:
1.新建一张task实例的状态表如:task_instance_state。
2.扩展一个plugins,如:AsyncHttpOperator。AsyncHttpOperator实现逻辑:

在task_instance_state插入一条running状态记录running。

发送http请求给大数据平台,操作数据。

轮询查询task_instance_state状态是成功、失败、running。如是running则继续轮询,成功、失败操作相应后续操作。

3.提供一个restful api update task_instance_state,供大数据平台回调,修改任务实例状态。

不错的文章推荐

瓜子云的任务调度系统
Get started developing workflows with Apache Airflow
官网地址
生产环境使用可能遇到的坑
初探airflow
焦油坑
系统研究Airbnb开源项目airflow

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/41221.html

相关文章

  • 数据科学部门如何使用Python和R组合完成任务

    摘要:数据科学项目的完整流程通常是这样的五步骤需求定义数据获取数据治理数据分析数据可视化一需求定义需求定义是数据科学项目和数据科学比赛的最大不同之处,在真实情景下,我们往往对目标函数自变量约束条件都并不清晰。 概述 和那些数据科学比赛不同,在真实的数据科学中,我们可能更多的时间不是在做算法的开发,而是对需求的定义和数据的治理。所以,如何更好的结合现实业务,让数据真正产生价值成了一个更有意义的...

    Apollo 评论0 收藏0
  • [原]数据科学教程:如何使用Airflow调度数据科学工作流

    摘要:概述是一个我们正在用的工作流调度器,相对于传统的任务管理,很好的为我们理清了复杂的任务依赖关系监控任务执行的情况。步骤三修改默认数据库找到配置文件修改配置注意到,之前使用的的方式是行不通的。微信号商业使用请联系作者。 showImg(https://segmentfault.com/img/remote/1460000006760428?w=1918&h=1556); 概述 Airfl...

    v1 评论0 收藏0
  • 一个适合小公司用的 data pipeline 工具

    摘要:有了自己的系统我觉得就很安心了,以后能够做数据处理和机器学习方面就相对方便一些。隆重推荐的工具是我很喜欢的公司,他们有很多开源的工具,我觉得是最实用的代表。是,在很多机器学习里有应用,也就是所谓的有向非循环。 最近在Prettyyes一直想建立起非常专业的data pipeline系统,然后没有很多时间,这几个礼拜正好app上线,有时间开始建立自己的 data pipeline,能够很...

    2i18ns 评论0 收藏0
  • [译] 解密 Airbnb 的数据流编程神器:Airflow 中的技巧和陷阱

    摘要:显然,这单独执行不起作用这将通过子操作符被作为像是自己的调度任务中那样运行。子也必须有个可用调度即使子作为其父的一部分被触发子也必须有一个调度如果他们的调度是设成,这个子操作符将不会触发任何任务。这两个例子都是缘起子操作符被当做了回填工作。 showImg(https://segmentfault.com/img/remote/1460000006768714); 前言 Airbnb的...

    zsy888 评论0 收藏0

发表评论

0条评论

leap_frog

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<