摘要:有了自己的系统我觉得就很安心了,以后能够做数据处理和机器学习方面就相对方便一些。隆重推荐的工具是我很喜欢的公司,他们有很多开源的工具,我觉得是最实用的代表。是,在很多机器学习里有应用,也就是所谓的有向非循环。
最近在Prettyyes一直想建立起非常专业的data pipeline系统,然后没有很多时间,这几个礼拜正好app上线,有时间开始建立自己的 data pipeline,能够很好的做每天的数据导入,数据收集,以及数据分析。
什么是ETLETL 是常用的数据处理,在以前的公司里,ETL 差不多是数据处理的基础,要求非常稳定,容错率高,而且能够很好的监控。ETL的全称是 Extract,Transform,Load, 一般情况下是将乱七八糟的数据进行预处理,然后放到储存空间上。可以是SQL的也可以是NOSQL的,还可以直接存成file的模式。
一开始我的设计思路是,用几个cron job和celery来handle所有的处理,然后将我们的log文件存在hdfs,还有一些数据存在mysql,大概每天跑一次。核心是能够scale,稳定,容错,roll back。我们的data warehouse就放在云上,就简单处理了。
有了自己的ETL系统我觉得就很安心了,以后能够做数据处理和机器学习方面就相对方便一些。
问题来了一开始我设计的思路和Uber一开始的ETL很像,因为我觉得很方便。但是我发觉一个很严重的问题,我一个人忙不过来。首先,要至少写个前端UI来监控cron job,但是市面上的都很差。其次,容错的autorestart写起来很费劲,可能是我自己没有找到一个好的处理方法。最后部署的时候相当麻烦,如果要写好这些东西,我一个人的话要至少一个月的时间,可能还不是特别robust。在尝试写了2两天的一些碎片处理的脚本之后我发觉时间拖了实在太久了。
隆重推荐的工具airbnb是我很喜欢的公司,他们有很多开源的工具,airflow我觉得是最实用的代表。airflow 是能进行数据pipeline的管理,甚至是可以当做更高级的cron job 来使用。现在一般的大厂都说自己的数据处理是ETL,美其名曰 data pipeline,可能跟google倡导的有关。airbnb的airflow是用python写的,它能进行工作流的调度,提供更可靠的流程,而且它还有自带的UI(可能是跟airbnb设计主导有关)。话不多说,先放两张截图:
airflow里最重要的一个概念是DAG。
DAG是directed asyclic graph,在很多机器学习里有应用,也就是所谓的有向非循环。但是在airflow里你可以看做是一个小的工程,小的流程,因为每个小的工程里可以有很多“有向”的task,最终达到某种目的。在官网中的介绍里说dag的特点:
Scheduled: each job should run at a certain scheduled interval
Mission critical: if some of the jobs aren’t running, we are in trouble
Evolving: as the company and the data team matures, so does the data processing
Heterogenous: the stack for modern analytics is changing quickly, and most companies run multiple systems that need to be glued together
YEAH! It"s awesome, right? After reading all of these, I found it"s perfectly fit Prettyyes.
如何安装安装airflow超级简单,使用pip就可以,现在airflow的版本是1.6.1,但是有个小的bug,这个之后会告诉大家如何修改。
pip install airflow
这里有个坑,因为airflow涉及到很到数据处理的包,所以会安装pandas和numpy(这个Data Scientist应该都很熟悉)但是国内pip install 安装非常慢,用douban的源也有一些小的问题。我的解决方案是,直接先用豆瓣的源安装numpy 和 pandas,然后再安装airflow,自动化部署的时候可以在requirements.txt 里调整顺序就行了
如何运行摘自官方网站
# airflow needs a home, ~/airflow is the default, # but you can lay foundation somewhere else if you prefer # (optional) export AIRFLOW_HOME=~/airflow # install from pypi using pip pip install airflow # initialize the database airflow initdb # start the web server, default port is 8080 airflow webserver -p 8080
然后你就可以上web ui查看所有的dags,来监控你的进程。
如何导入dag一般第一次运行之后,airflow会在默认文件夹下生成airflow文件夹,然后你只要在里面新建一个文件dag就可以了。我这边部署在阿里云上的文件tree大概是这个样子的。
以下是我自己写的我们公司prettyyes里需要每天处理log的其中一个小的dag:
from airflow import DAG from airflow.operators import BashOperator from datetime import datetime, timedelta import ConfigParser config = ConfigParser.ConfigParser() config.read("/etc/conf.ini") WORK_DIR = config.get("dir_conf", "work_dir") OUTPUT_DIR = config.get("dir_conf", "log_output") PYTHON_ENV = config.get("dir_conf", "python_env") default_args = { "owner": "airflow", "depends_on_past": False, "start_date": datetime.today() - timedelta(days=1), "retries": 2, "retry_delay": timedelta(minutes=15), } dag = DAG("daily_process", default_args=default_args, schedule_interval=timedelta(days=1)) templated_command = "echo "single" | {python_env}/python {work_dir}/mr/LogMR.py" .format(python_env=PYTHON_ENV, work_dir=WORK_DIR) + " --start_date {{ ds }}" task = BashOperator( task_id="process_log", bash_command=templated_command, dag=dag )
写好之后,只要将这个dag放入之前建立好的dag文件夹,然后运行:
python
来确保没有语法错误。在测试里你可以看到我的
schedule_interval=timedelta(days=1)
这样我们的数据处理的任务就相当于每天跑一次。更重要的是,airflow还提供处理bash处理的接口外还有hadoop的很多接口。可以为以后连接hadoop系统提供便利。很多具体的功能可以看官方文档。
其中的一个小的bugairflow 1.6.1有一个网站的小的bug,安装成功后,点击dag里的log会出现以下页面:
这个只要将
airflow/www/utils.py
文件替换成最新的airflow github上的utils.py文件就行,具体的问题在这个:
fixes datetime issue when persisting logs
使用supervisord进行deamonairflow本身没有deamon模式,所以直接用supervisord就ok了,我们只要写4行代码。
[program:airflow_web] command=/home/kimi/env/athena/bin/airflow webserver -p 8080 [program:airflow_scheduler] command=/home/kimi/env/athena/bin/airflow scheduler
我觉得airflow特别适合小的团队,他的功能强大,而且真的部署方便。和hadoop,mrjob又可以无缝连接,对我们的业务有很大的提升。
Prettyyes 不以貌取人最肤浅
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/37685.html
摘要:下面我想介绍另一种学习思路,自顶向下的学习。是通过虚拟机创建集群,支持多种虚拟机,我这里用的。对内或对外暴露服务。和的控制器,通过配置的规则来管理。三个主要的命令行程序用了启动集群。需要在所以节点上运行,处理集群内部通讯,类似。 之前就玩过docker,但是一直不知道怎么把容器运用到生产上。构建一个docker镜像,把他run起来很简单;难的是容器的部署(CICD),容器的网络,数据持...
摘要:下面我想介绍另一种学习思路,自顶向下的学习。是通过虚拟机创建集群,支持多种虚拟机,我这里用的。对内或对外暴露服务。和的控制器,通过配置的规则来管理。三个主要的命令行程序用了启动集群。需要在所以节点上运行,处理集群内部通讯,类似。 之前就玩过docker,但是一直不知道怎么把容器运用到生产上。构建一个docker镜像,把他run起来很简单;难的是容器的部署(CICD),容器的网络,数据持...
摘要:下面我想介绍另一种学习思路,自顶向下的学习。是通过虚拟机创建集群,支持多种虚拟机,我这里用的。对内或对外暴露服务。和的控制器,通过配置的规则来管理。三个主要的命令行程序用了启动集群。需要在所以节点上运行,处理集群内部通讯,类似。 之前就玩过docker,但是一直不知道怎么把容器运用到生产上。构建一个docker镜像,把他run起来很简单;难的是容器的部署(CICD),容器的网络,数据持...
摘要:使用的公司能大大增加他们的应用程序发行频率。然而,这是战略需求,将会提高交付速度,减少错误。我们的建议是,最好进入流程定义,以实现零接触持续部署的总体目标。 在最好的时候创建用户喜欢的高质量应用程序并不是件容易的事情。更何况,要怎样做才能更快地创建用户喜欢的高质量应用程序并且能够不断改进它们呢?这就是需要引入持续集成和持续交付(CI / CD)的地方。 持续集成(CI) 什么是持续集成...
阅读 3527·2021-11-08 13:15
阅读 2081·2019-08-30 14:20
阅读 1342·2019-08-28 18:08
阅读 961·2019-08-28 17:51
阅读 1458·2019-08-26 18:26
阅读 2974·2019-08-26 13:56
阅读 1452·2019-08-26 11:46
阅读 2568·2019-08-23 14:22