资讯专栏INFORMATION COLUMN

[译] 解密 Airbnb 的数据流编程神器:Airflow 中的技巧和陷阱

zsy888 / 2800人阅读

摘要:显然,这多带带执行不起作用这将通过子操作符被作为像是自己的调度任务中那样运行。子也必须有个可用调度即使子作为其父的一部分被触发子也必须有一个调度如果他们的调度是设成,这个子操作符将不会触发任何任务。这两个例子都是缘起子操作符被当做了回填工作。

前言

Airbnb的数据工程师 Maxime Beauchemin 激动地表示道:Airflow 是一个我们正在用的工作流调度器,现在的版本已经更新到1.6.1了,并且引入了一些列调度引擎的改革。我们喜欢它是因为它写代码太容易了,也便于调试和维护。我们也喜欢全都用他来写代码,而不是像xml那样的配置文件用来描述DAG。更不用说,我们显然不用再学习太多东西。

任务隔离

在一个分布式环境中,宕机是时有发生的。Airflow通过自动重启任务来适应这一变化。到目前为止一切安好。当我们有一系列你想去重置状态的任务时,你就会发现这个功能简直是救世主。为了解决这个问题,我们的策略是建立子DAG。这个子DAG任务将自动重试自己的那一部分,因此,如果你以子DAG设置任务为永不重试,那么凭借子DAG操作你就可以得到整个DAG成败的结果。如果这个重置是DAG的第一个任务设置子DAG的策略就会非常有效,对于有一个相对复杂的依赖关系结构设置子DAG是非常棒的做法。注意到子DAG操作任务不会正确地标记失败任务,除非你从GitHub用了最新版本的Airflow。解决这个问题的另外一个策略是使用重试柄:

def make_spooq_exporter(table, schema, task_id, dag):
     return SpooqExportOperator(
        jdbc_url=("jdbc:mysql://%s/%s?user=user&password=pasta"
                    % (TARGET_DB_HOST,TARGET_DB_NAME)),
        target_table=table,
        hive_table="%s.%s" % (schema, table),
        dag=dag,
        on_retry_callback=truncate_db,
        task_id=task_id)
    
def truncate_db(context):
    hook = MySqlHook("clean_db_export")
    hook.run(
        "truncate `%s`"%context["task_instance"].task.target_table,
        autocommit=False,
        parameters=None)

这样你的重试柄就可以将任务隔离,每次执行某个特定的任务。

代码定义任务

这在执行一个特定的可重复的任务时非常管用。用代码来定义工作流是这个系统最强大之处是你可以以编码的方式产生DAG。这在在没有人工干预的情况下自动接入新的数据源的时候非常有用。

我们借助现有的日志目录将检查HDFS日志融入DAG,并且在每次融入这些数据的时候在每个目录下产生一个任务。示例代码如下:

lognames = list(
    hdfs.list_filenames(conf.get("incoming_log_path"), full_path=False))


for logname in lognames:
    # TODO 使用适当的正则表达式来过滤掉不良日志名,使得Airflow 能用符合特定的字符找出相应任务的名字
    if logname not in excluded_logs and "%" not in logname and "@" not in logname:

        ingest = LogIngesterOperator(
            # 因为log_name以作为unicode返回值,所以需要用str()包装task_id
            task_id=str("ingest_%s" % logname),
            db=conf.get("hive_db"),
            logname=logname,
            on_success_callback=datadog_api.check_data_lag,
            dag=dp_dag
        )

        ingest.set_upstream(transfer_from_incoming)
        ingest.set_downstream(transform_hive)
今日事,今日毕

在每天结束的时候执行每日任务,而不是在当天工作开始的时候去执行这些任务。你不能将子DAG放在DAG文件夹下,换句话说除非你保管一类DAG,否则你不可以将子DAG放在自己的模块中。

子DAG与主DAG不能嵌套

或者更具体地说就是,虽然你也可以将子DAG放在DAG文件夹下,但是接着子DAG将先主DAG一样运行自己的调度。这里是一个两个DAG的例子(假设他们同时在DAG文件夹下,也就是所谓的差DAG)这里的子DAG将在主DAG中通过调度器被多带带调度。

from airflow.models import DAG
from airflow.operators import PythonOperator, SubDagOperator
from bad_dags.subdag import hive_dag
from datetime import timedelta, datetime

main_dag = DAG(
    dag_id="main_dag",
    schedule_interval=timedelta(hours=1),
    start_date=datetime(2015, 9, 18, 21)
)

# 显然,这多带带执行不起作用
transform_hive = SubDagOperator(
    subdag=hive_dag,
    task_id="hive_transform",
    dag=main_dag,
    trigger_rule=TriggerRule.ALL_DONE
)
from airflow.models import DAG
from airflow.operators import HiveOperator
from datetime import timedelta, datetime

# 这将通过子DAG操作符被作为像是自己的调度任务中那样运行。
hive_dag = DAG("main_dag.hive_transform",
          # 注意到这里的重复迭代
           schedule_interval=timedelta(hours=1),
           start_date=datetime(2015, 9, 18, 21))

hive_transform = HiveOperator(task_id="flatten_tables",
                              hql=send_charge_hql,
                              dag=dag)

除非你真的想这个子DAG被主DAG调度。

我们通过使用工厂函数解决这个问题。这是一个优势那就是 主DAG可以传递一些必要的参数到子DAG,因此他们在调度的时候其他参数也自动赋值了。当你的主DAG发生变化时,我们不需要去跟踪参数。

在下面的例子中,假设DAG是所谓的好DAG:

from airflow.models import DAG
from airflow.operators import PythonOperator, SubDagOperator
from good_dags.subdag import hive_dag
from datetime import timedelta, datetime

main_dag = DAG(
    dag_id="main_dag",
    schedule_interval=timedelta(hours=1),
    start_date=datetime(2015, 9, 18, 21)
)

# 显然,这多带带执行不起作用
transform_hive = SubDagOperator(
    subdag=hive_dag(main_dag.start_date, main_dag.schedule_interval),
    task_id="hive_transform",
    dag=main_dag,
    trigger_rule=TriggerRule.ALL_DONE
)
from airflow.models import DAG
from airflow.operators import HiveOperator

# 对调度程序来说,没有Dag的顶层模块就不起作用了
def hive_dag(start_date, schedule_interval):
  # you might like to make the name a parameter too
  dag = DAG("main_dag.hive_transform",
            # 注意这里的设置
            schedule_interval=schedule_interval,
            start_date=start_date)

  hive_transform = HiveOperator(task_id="flatten_tables",
                                hql=send_charge_hql,
                                dag=dag)
  return dag

使用工厂类使得子DAG在保障调度器从开始运行时就可维护就更强。

另一种模式是将主DAG和子DAG之间的共享设为默认参数,然后传递到工厂函数中去,(感谢 Maxime 的建议)。

子DAG也必须有个可用调度

即使子DAG作为其父DAG的一部分被触发子DAG也必须有一个调度,如果他们的调度是设成None,这个子DAG操作符将不会触发任何任务。

更糟糕的是,如果你对子DAG被禁用,接着你又去运行子DAG操作,而且还没运行完,那么以后你的子DAG就再也运行不起来了。

这将快速导致你的主DAG同时运行的任务数量一下就达到上限(默认一次写入是16个)并且这将导致调度器形同虚设。

这两个例子都是缘起子DAG操作符被当做了回填工作。这里可以看到这个

什么是DagRun:迟到的礼物

Airflow1.6的最大更新是引入了DagRun。现在,任务调度实例是由DagRun对象来创建的。

相应地,如果你想跑一个DAG而不是回填工作,你可能就需要用到DagRun。

你可以在代码里写一些airflow trigger_dag命令,或者也可以通过DagRun页面来操作。

这个巨大的优势就是调度器的行为可以被很好的理解,就像它可以遍历DagRun一样,基于正在运行的DagRun来调度任务实例。

这个服务器现在可以向我们显示每一个DagRun的状态,并且将任务实例的状态与之关联。

DagRun是怎样被调度的

新的模型也提供了一个控制调度器的方法。下一个DagRun会基于数据库里上一个DagRun的实例来调度。
除了服务峰值的例外之外,大多数实例是处于运行还是结束状态都不会影响整体任务的运行。
这意味着如果你想返回一个在现有和历史上不连续集合的部分DagRun ,你可以简单删掉这个DagRun任务实例,并且设置DagRun的状态为正在运行。

调度器应该经常重启

按照我们的经验,一个需要占用很长时间运行的调度器至少是个最终没有安排任务的CeleryExcecutor。很不幸,我们仍然不知道具体的原因。不过庆幸的是,Airflow 内建了一个以num_runs形式作标记的权宜之计。它为调度器确认了许多迭代器来在它退出之前确保执行这个循环。我们运行了10个迭代,Airbnb一般运行5个。注意到这里如果用LocalExecutor将会引发一些问题。我们现在使用chef来重启executor;我们正计划转移到supervisor上来自动重启。

操作符的依赖于依赖包

这个airflow.operators包有一些魔法,它让我们只能使用正确导入的操作符。这意味着如果你没有安装必要的依赖,你的操作符就会失效。

这是所有的 Fork! (现在)

Airflow 是正在快速迭代中,而且不只是Airbnb自己在做贡献。Airflow将会继续演化,而我也将写更多有关Airflow的技巧供大家学习使用。

如果你也对解决这些问题感兴趣,那就加入我们吧!

参考资料

Airflow官方文档

docker-airflow

Airflow 的GitHub地址

Designing workflow with Airflow

Airflow Demo

pandastrike:Airflow

Airflow review

Airflow and Hive

Youtube: Airflow An open source platform to author and monitor data pipelines

Hackenews: Airflow by airbnb is a nice alternative to luigi

Luigi vs Airflow vs Pinball

Existing Workflow systems

Jonathan Dinu: Scalable Pipelines with Luigi or: I’ll have the Data Engineering, hold the Java!

AirFlow Joins Apache Incubator

Managing Containerized Data Pipeline Dependencies With Luigi

Petabyte-Scale Data Pipelines with Docker, Luigi and Elastic Spot Instances

工作流调研 oozie vs azkaban

日拱一卒

Existing Workflow systems

Awesome Pipeline

rediit: Azkaban vs Oozie vs Airflow

推荐阅读

董老师在硅谷:[硅谷热门公司技术巡礼]1.Airbnb基础数据架构

董老师在硅谷:DAG、Workflow 系统设计、Airflow 与开源的那些事儿

[原]数据流编程教程:如何使用Airflow构建数据科学工作流

原作者:Marcin Tustin 翻译:Harry Zhu
英文原文地址:Airflow: Tips, Tricks, and Pitfalls

作为分享主义者(sharism),本人所有互联网发布的图文均遵从CC版权,转载请保留作者信息并注明作者 Harry Zhu 的 FinanceR专栏:https://segmentfault.com/blog/harryprince,如果涉及源代码请注明GitHub地址:https://github.com/harryprince。微信号: harryzhustudio
商业使用请联系作者。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/37915.html

相关文章

  • [原]数据科学教程:如何使用Airflow调度数据科学工作流

    摘要:概述是一个我们正在用的工作流调度器,相对于传统的任务管理,很好的为我们理清了复杂的任务依赖关系监控任务执行的情况。步骤三修改默认数据库找到配置文件修改配置注意到,之前使用的的方式是行不通的。微信号商业使用请联系作者。 showImg(https://segmentfault.com/img/remote/1460000006760428?w=1918&h=1556); 概述 Airfl...

    v1 评论0 收藏0
  • [] 解密 Airbnb 数据科学部门如何构建知识仓库

    摘要:在同行评议上,我们检查方法论的改进现有工作的关联性以及准确的解释性声明。学习价值通过之前一系列的工作,现在数据科学家可以分享自己的新方法论代码技术并且加快品牌化推广,让团队之外的人可以快速了解自己的领域。 顽疾 Airbnb的数据团队很重要的一个职责就是传播基于数据的决策方法。我们将数据的获取民主化,使得每一个Airbnb的成员都可以量化他们基于数据的决策影响力并且借此洞察用户偏好,提...

    Taonce 评论0 收藏0
  • 3月份前端资源分享

    摘要:面试如何防骗一份优秀的前端开发工程师简历是怎么样的作为,有哪些一般人我都告诉他,但是他都不听的忠告如何面试前端工程师 更多资源请Star:https://github.com/maidishike... 文章转自:https://github.com/jsfront/mo... 3月份前端资源分享 1. Javascript 使用judge.js做信息判断 javascript...

    nanchen2251 评论0 收藏0
  • 蠎周刊 2015 年度最赞

    摘要:蠎周刊年度最赞亲俺们又来回顾又一个伟大的年份儿包去年最受欢迎的文章和项目如果你错过了几期就这一期不会丢失最好的嗯哼还为你和你的准备了一批纪念裇从这儿获取任何时候如果想分享好物给大家在这儿提交喜欢我们收集的任何意见建议通过来吧原文 Title: 蠎周刊 2015 年度最赞Date: 2016-01-09 Tags: Weekly,Pycoder,Zh Slug: issue-198-to...

    young.li 评论0 收藏0
  • [原]解密Airbnb 自助BI神器:Superset 颠覆 Tableau

    摘要:概述我非常认同前百度数据工程师现神策分析创始人桑老师最近谈到的数据分析三重境界统计计数多维分析机器学习数据分析的统计计数和多维分析,我们通常称之为数据探索式分析,这个步骤旨在了解数据的特性,有助于我们进一步挖掘数据的价值。 showImg(https://camo.githubusercontent.com/f98421e503a81176b003ddd310d97e1e1214625...

    Keagan 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<