资讯专栏INFORMATION COLUMN

Flink实现批处理离线计算

IT那活儿 / 3312人阅读
Flink实现批处理离线计算





  引  言  



笔者的项目,一直是用flink进行流处理,为什么这次写flink批处理离线计算呢,因为客户提的新需求更适合用批处理离线计算:
  1. 客户不需要实时计算的结果,认为实时结果对他们没有参考意义

  2. 客户要求计算一个月(或更长时间)内历史数据的基准值,认为一个月(或更长时间)历史数据得到的基准值才能更加准确的评估和预测未来的趋势


一 . 批处理介绍

大数据计算分为离线计算和实时计算,离线计算就是我们通常说的批计算,代表是Hadoop MapReduce、Hive等大数据技术,实时计算也被称作流计算,代表是Storm、Spark Streaming、Flink等大数据技术。

批处理在大数据世界有着悠久的历史。批处理主要操作大容量静态数据集,并在计算过程完成后返回结果。

批处理模式中使用的数据集通常符合下列特征:
有界:批处理数据集代表数据的有限集合
持久:数据通常始终存储在某种类型的持久存储位置中
大量:海量数据集

批处理非常适合需要访问全套记录才能完成的计算工作。例如在计算总数和平均数时,必须将数据集作为一个整体加以处理,而不能将其视作多条记录的集合。这些操作要求在计算进行过程中数据维持自己的状态。

需要处理大量数据的任务通常最适合用批处理操作进行处理。无论直接从持久存储设备处理数据集,或首先将数据集载入内存,批处理系统在设计过程中就充分考虑了数据的量,可提供充足的处理资源。由于批处理在应对大量持久数据方面的表现极为出色,因此经常被用于对历史数据进行分析。


二. 批处理vs流处理

相信各位已经看过诸多流计算优点的文章,流计算的优点笔者就略过, 下面说一下批处理相比流处理的优点。


1. 批处理的吞吐量大、资源利用率高

由于批量和流式处理数据粒度不一样,批量每次处理一定大小的数据块(输入一般采用文件系统),一个任务处理完一个数据块之后,才将处理好的中间数据发送给下游。流式计算则是以单条记录为单位,任务在处理完一条记录之后,然后发送给下游进行处理。流式计算来一条记录就计算一次,计算量巨大,当不需要中间值的时候,这种计算属实浪费,因此批处理的吞吐量更大、资源利用率更高、系统的开销更小。


2. 批处理容易实现精准计算

流处理数据丢失和重复处理

精确一次(exactly once)是指数据处理没有数据丢失和重复处理的现象。

流处理的数据来源一般是消息队列,是无界的,数据是一条一条获取,在加载数据时可能会出现网络连接等问题,所以流处理需要解决数据丢失和重复处理的问题,实现精确一次(exactly once)的语义相对复杂,目前storm流框架目前不支持(exactly once),spark为了支持(exactly once)引入预写日志(AWL)并且offset由Spark自身管理 ,flink为了支持(exactly once)引入快照(snapshot)机制, 虽然流处理能够解决数据丢失和重复计算问题,但需要引入各种机制,而这增加了系统消耗的资源。

批处理的数据源是静态块,比如文件,hdfs文件,批处理一次性加载一批数据,基本不会出现数据丢失和重复计算的情况。

流处理水印(watermark)忽略数据

如果说流处理引入各种机制增加资源消耗可以解决数据丢失和重复处理问题,那么对于乱序数据流则存在忽略数据的可能。

流处理数据没有边界,需要窗口(window)的概念,根据窗口来汇总计算。窗口(window)类型有很多种, 滚动窗口(Tumbling Window)、滑动窗口(Sliding Window)和会话窗口(Session window)等,窗口(window)中需要定义时间,流处理中存在事件时间(event time)和处理时间(process time)。对于乱序的数据,为此又引入了水印(watermark)机制。具体概念读者自行查阅。

水印(watermark)有一个允许延时(allow lateness)的参数, 窗口(window)接收到水印(watermark)后,再等待一段时间才会关闭窗口,如果这段时间有些数据依然没有发送过来,那就只能忽略它们了。允许延时(allow lateness)参数设置的大,系统占用的资源就多,而且允许延时(allow lateness)的参数不能设置无限大,因此如果数据源异常乱序,流处理的窗口就等不到延时数据过来就进行汇总计算,导致延时数据未处理。

批处理数据有界,所有的数据全部都会加载,不用考虑数据源的顺序,不会出现忽略数据的情况,也不需要窗口(window) ,时间,水印等机制。


三. Flink实现批处理离线计算

通过上面简单的介绍和对比,发现客户的需求更适合用批处理离线计算,由于Flink是一个流处理框架, 可以处理有边界和无边界的数据流。无边界的数据就是流数据,有边界的数据就是批数据,因此Flink也是支持批处理的。所以笔者采用Flink进行批处理计算。

以下是核心代码:

flink执行环境:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

从kafka中获取DataSet数据源,DataSet表示批处理的数据 (流处理是DataStream):

DataSetString, String>> recordsDataSetDataSet = env.createInput(KafkaInputFormat
        .buildKafkaInputFormat().setBootstrapServers(KafkaServers)
        .setGroupId(xx).setTopic(sourceTopic).finish());

继承GenericInputFormat类实现自定义获取kafka数据源 KafkaInputFormat:

public class KafkaInputFormat extends GenericInputFormatString, String>> {
  @Override
  public void open(GenericInputSplit split) throws IOException {
        consumer = new KafkaConsumer<String,String>(props);
        initPartionMap();
  }
  //获取kafka topic每个分区的偏移量,用做kafka消费结束的标识
  void initPartionMap(){
    Collection partitionInfos = consumer.partitionsFor(topic);
        List tp =new ArrayList();
        partitionInfos.forEach(partitionInfo -> {
            tp.add(new TopicPartition(topic,partitionInfo.partition()));
            consumer.assign(tp);
            consumer.seekToEnd(tp);
            partionOffsetMap.put(partitionInfo.partition(),consumer.position(new TopicPartition(topic, partitionInfo.partition())));
            partionBooleanMap.put(partitionInfo.partition(), false);
            //获取参数值后返回最初
            consumer.seekToBeginning(tp);
        });
  }
  //消费kafka是否结束
  @Override
  public boolean reachedEnd() throws IOException {
    return !partionBooleanMap.containsValue(false);
  }
  @Override
  public ConsumerRecords<String, String> nextRecord(ConsumerRecords<String, String> reuse) {
     //从kafka中获取一批数据
     final ConsumerRecords<String, String> records consumer.poll(Duration.ofMillis(pollTime));
        for (ConsumerRecord<String, String> record : records) {
          Integer partion=record.partition();
          Long offset= record.offset();
          //表示已有分区已经消费完
          if(offset+1==partionOffsetMap.get(partion)) {
            partionBooleanMap.put(partion, true);
          }
        }
     return records;
  }


四. 总结

以前的流处理计算过程经过批处理改造后,计算时间大大缩短,也不需要设置窗口(window)、等待时间(allow lateness)和水印(watermark),而且计算完成程序自动退出,不再占用系统资源。

流处理和批处理都有各自的优缺点和应用场景,应该根据项目需求选择合适的。


END


更多精彩干货分享

点击下方名片关注

IT那活儿

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/129897.html

相关文章

  • 你公司到底需不需要引入实时计算引擎?

    摘要:再如通过处理流数据生成简单的报告,如五分钟的窗口聚合数据平均值。复杂的事情还有在流数据中进行数据多维度关联聚合塞选,从而找到复杂事件中的根因。因为各种需求,也就造就了现在不断出现实时计算框架,而下文我们将重磅介绍我们推荐的实时计算框架。 前言 先广而告之,本文摘自本人《大数据重磅炸弹——实时计算框架 Flink》课程第二篇,内容首发自我的知识星球,后面持续在星球里更新,这里做个预告,今...

    HackerShell 评论0 收藏0
  • OPPO数据中台之基石:基于Flink SQL构建实数据仓库

    摘要:实际上,本身就预留了与外部元数据对接的能力,分别提供了和这两个抽象。对接外部数据源搞清楚了注册库表的过程,给我们带来这样一个思路如果外部元数据创建的表也能被转换成可识别的,那么就能被无缝地注册到。 本文整理自 2019 年 4 月 13 日在深圳举行的 Flink Meetup 会议,分享嘉宾张俊,目前担任 OPPO 大数据平台研发负责人,也是 Apache Flink contrib...

    jeffrey_up 评论0 收藏0
  • Apache Flink,流计算?不仅仅是流计算

    摘要:基于流处理机制实现批流融合相对基于批处理机制实现批流融合的思想更自然,更合理,也更有优势,因此阿里巴巴在基于支持大量核心实时计算场景的同时,也在不断改进的架构,使其朝着真正批流融合的统一计算引擎方向前进。 阿里妹导读:2018年12月下旬,由阿里巴巴集团主办的Flink Forward China在北京国家会议中心举行。Flink Forward是由Apache软件基金会授权的全球范围...

    KoreyLee 评论0 收藏0
  • Flink 从0到1学习—— 分享四本 Flink 国外的书和二十多篇 Paper 论文

    摘要:另外,将机制发扬光大,对有着非常好的支持。系统也注意到并讨论了和的问题。总结本文分享了四本相关的书籍和一份领域相关的论文列表篇,涉及的设计,实现,故障恢复,弹性扩展等各方面。 前言 之前也分享了不少自己的文章,但是对于 Flink 来说,还是有不少新入门的朋友,这里给大家分享点 Flink 相关的资料(国外数据 pdf 和流处理相关的 Paper),期望可以帮你更好的理解 Flink。...

    jollywing 评论0 收藏0
  • Jstorm到Flink 在今日头条的迁移实践

    摘要:第二个问题就是说业务团队之间没有扩大管理,预算和审核是无头绪的。支持一些高优先级的比如说支持以及窗口等特性包括说。到现在为止,整体迁移完了,还剩下十个左右的作业没有迁移完。 作者:张光辉 本文将为大家展示字节跳动公司怎么把Storm从Jstorm迁移到Flink的整个过程以及后续的计划。你可以借此了解字节跳动公司引入Flink的背景以及Flink集群的构建过程。字节跳动公司是如何兼容以...

    luckyyulin 评论0 收藏0

发表评论

0条评论

IT那活儿

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<