资讯专栏INFORMATION COLUMN

flink学习系列--基础知识(一)

Warren / 2926人阅读

摘要:前言最近因公司业务需求,需要使用到大数据分析。提供的可用于处理无尽的数据流。类似于把一个记录拆分成两条三条甚至是四条记录例如把一个字符串分割成一个字符数组。是一个聚合操作,如计数求和求平均等。实现把两个流连成一个流。

前言

最近因公司业务需求,需要使用到大数据分析。选择了flink,第一次听说flink我也是很懵逼的状态,不过一段时间下来有了一点心得,在这里和大家分享分享。有很多描述不准确的,大家多提提意见。

1.flink是什么,为什么要flink?

其实大数据框架有很多,比如Hadoop(批处理),Storm(流处理),Samza(流处理),Spark...但是我们选择的是flink,为什么呢?因为flink是“流式批处理”,flink将每一项视作真正的数据流。Flink提供的DataStream API可用于处理无尽的数据流。Flink可配合使用的基本组件包括:

Stream(流)是指在系统中流转的,永恒不变的无边界数据集

Operator(操作方)是指针对数据流执行操作以产生其他数据流的功能

Source(源)是指数据流进入系统的入口点

Sink(槽)是指数据流离开Flink系统后进入到的位置,槽可以是数据库或到其他系统的连接器

说了这么多,我们做一个简单的demo来体验一下flink:
假设我们在电商平台,需要近实时(5min)统计(1h内)商品点击量的前三名。然后实时展示出来。如果使用java,我们需要做一个定时任务,监听商品点击事件,然后每5min使用sql计算一下...如果数据量小,间隔时间比较长,还比较好,如果数据量大,间隔时间比较短...那服务器的压力就会贼大...但是使用flink会怎么样呢?先看下代码(40几W条数据从阿里淘宝获取,github上):

/**

Created by liuliang

on 2019/5/24

*/
public class HotItems {

public static void main(String[] args) throws Exception {

    // 创建 execution environment
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // 告诉系统按照 EventTime 处理
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

    // 为了打印到控制台的结果不乱序,配置全局的并发为1,改变并发对结果正确性没有影响
    env.setParallelism(1);

    // URL fileUrl = HotItems.class.getClassLoader().getResource("D:mftcodesflink-learnningsrcmainjavacncrawlermftUserBehavior.csv");
    Path filePath = Path.fromLocalFile(new File("D:mftcodesflink-learnningsrcmainjavacncrawlermftUserBehavior.csv"));

    // 抽取 UserBehavior 的 TypeInformation,是一个 PojoTypeInfo
    PojoTypeInfo pojoType = (PojoTypeInfo) TypeExtractor.createTypeInfo(UserBehavior.class);


    // 由于 Java 反射抽取出的字段顺序是不确定的,需要显式指定下文件中字段的顺序
    String[] fieldOrder = new String[]{"userId", "itemId", "categoryId", "behavior", "timestamp"};
    // 创建 PojoCsvInputFormat
    PojoCsvInputFormat csvInput = new PojoCsvInputFormat<>(filePath, pojoType, fieldOrder);


    env
            // 创建数据源,得到 UserBehavior 类型的 DataStream
            .createInput(csvInput, pojoType)
            // 抽取出时间和生成 watermark
            .assignTimestampsAndWatermarks(new AscendingTimestampExtractor() {
                @Override
                public long extractAscendingTimestamp(UserBehavior userBehavior) {
                    // 原始数据单位秒,将其转成毫秒
                    return userBehavior.timestamp * 1000;
                }
            })
            // 过滤出只有点击的数据
            .filter(new FilterFunction() {
                @Override
                public boolean filter(UserBehavior userBehavior) throws Exception {
                    // 过滤出只有点击的数据
                    return userBehavior.behavior.equals("pv");
                }
            })
            .keyBy("itemId")
            .timeWindow(Time.minutes(60), Time.minutes(5))
            .aggregate(new CountAgg(), new WindowResultFunction())
            .keyBy("windowEnd")
            .process(new TopNHotItems(3))
            .print();

    env.execute("Hot Items Job");

}



/** 求某个窗口中前 N 名的热门点击商品,key 为窗口时间戳,输出为 TopN 的结果字符串 */
public static class TopNHotItems extends KeyedProcessFunction {

    private final int topSize;

    public TopNHotItems(int topSize) {
        this.topSize = topSize;
    }

    // 用于存储商品与点击数的状态,待收齐同一个窗口的数据后,再触发 TopN 计算
    private ListState itemState;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        ListStateDescriptor itemsStateDesc = new ListStateDescriptor<>(
                "itemState-state",
                ItemViewCount.class);
        itemState = getRuntimeContext().getListState(itemsStateDesc);
    }

    @Override
    public void processElement(
            ItemViewCount input,
            Context context,
            Collector collector) throws Exception {

        // 每条数据都保存到状态中
        itemState.add(input);
        // 注册 windowEnd+1 的 EventTime Timer, 当触发时,说明收齐了属于windowEnd窗口的所有商品数据
        context.timerService().registerEventTimeTimer(input.windowEnd + 1);
    }

    @Override
    public void onTimer(
            long timestamp, OnTimerContext ctx, Collector out) throws Exception {
        // 获取收到的所有商品点击量
        List allItems = new ArrayList<>();
        for (ItemViewCount item : itemState.get()) {
            allItems.add(item);
        }
        // 提前清除状态中的数据,释放空间
        itemState.clear();
        // 按照点击量从大到小排序
        allItems.sort(new Comparator() {
            @Override
            public int compare(ItemViewCount o1, ItemViewCount o2) {
                return (int) (o2.viewCount - o1.viewCount);
            }
        });
        // 将排名信息格式化成 String, 便于打印
        StringBuilder result = new StringBuilder();
        result.append("====================================
");
        result.append("时间: ").append(new Timestamp(timestamp-1)).append("
");
        for (int i=0; i {

    @Override
    public void apply(
            Tuple key,  // 窗口的主键,即 itemId
            TimeWindow window,  // 窗口
            Iterable aggregateResult, // 聚合函数的结果,即 count 值
            Collector collector  // 输出类型为 ItemViewCount
    ) throws Exception {
        Long itemId = ((Tuple1) key).f0;
        Long count = aggregateResult.iterator().next();
        collector.collect(ItemViewCount.of(itemId, window.getEnd(), count));
    }
}


/** COUNT 统计的聚合函数实现,每出现一条记录加一 */
public static class CountAgg implements AggregateFunction {

    @Override
    public Long createAccumulator() {
        return 0L;
    }

    @Override
    public Long add(UserBehavior userBehavior, Long acc) {
        return acc + 1;
    }

    @Override
    public Long getResult(Long acc) {
        return acc;
    }

    @Override
    public Long merge(Long acc1, Long acc2) {
        return acc1 + acc2;
    }
}


/** 商品点击量(窗口操作的输出类型) */
public static class ItemViewCount {
    public long itemId;     // 商品ID
    public long windowEnd;  // 窗口结束时间戳
    public long viewCount;  // 商品的点击量

    public static ItemViewCount of(long itemId, long windowEnd, long viewCount) {
        ItemViewCount result = new ItemViewCount();
        result.itemId = itemId;
        result.windowEnd = windowEnd;
        result.viewCount = viewCount;
        return result;
    }
}



/** 用户行为数据结构 **/
public static class UserBehavior {
    public long userId;         // 用户ID
    public long itemId;         // 商品ID
    public int categoryId;      // 商品类目ID
    public String behavior;     // 用户行为, 包括("pv", "buy", "cart", "fav")
    public long timestamp;      // 行为发生的时间戳,单位秒
}

}

实时模拟的结果:

====================================
时间: 2017-11-26 09:05:00.0
No0:  商品ID=5051027  浏览量=3
No1:  商品ID=3493253  浏览量=3
No2:  商品ID=4261030  浏览量=3
====================================


====================================
时间: 2017-11-26 09:10:00.0
No0:  商品ID=812879  浏览量=5
No1:  商品ID=2600165  浏览量=4
No2:  商品ID=2828948  浏览量=4
====================================


====================================
时间: 2017-11-26 09:15:00.0
No0:  商品ID=812879  浏览量=7
No1:  商品ID=138964  浏览量=5
No2:  商品ID=4568476  浏览量=5
====================================


====================================
时间: 2017-11-26 09:20:00.0
No0:  商品ID=812879  浏览量=8
No1:  商品ID=2338453  浏览量=8
No2:  商品ID=2563440  浏览量=7
====================================

可以看到,我们用比较简单的代码,就实现了热点TOP n的问题.可见flink使用起来还是很方便的(至少比java方便不少)。

2.flink这么强大?为甚?

从上一个例子里面,我们已经初步体会到了flink的方便之处。我想从一下几个方面解释一下:

支持多种窗口

支持table api 【第二讲介绍】

exactly-once (正好一次) 【第二讲介绍】

1. 支持多种窗口
1.1 关于flink窗口我手动画了一个简单的图:

1.2flink窗口函数
窗口函数就是这四个:ReduceFunction,AggregateFunction,FoldFunction,ProcessWindowFunction.(当然也可以自定义window)

3.flink工作流程?
dataSource ->  DataTransformation(*)  ->dataSink

3.1 登陆监控demo了解 dataSource和dataSink

    dataSource:
        基于本地集合的 source、基于文件的 source、基于网络套接字的 source、自定义的 source
        自定义source:
        a:flink提供了很多定义好的sourceFunction 比如Kafka,RabbitMq,Mysql...
        b:StreamExecutionEnvironment.addSource(sourceFunction) 自己写sourceFunction 
          (实现ParallelSourceFunction / RichParallelSourceFunction )
    dataSink:
        写入文件、打印出来、写入 socket 、自定义的 sink 
        自定义的sink 
        a:同理,dataSink提供了很多定义好的dataSink...
        b:自定义dataSink

3.2 DataTransformation(*)

    简单的Transformation示意图【图2】
    Transformation:数据转换的各种操作,有 Map / FlatMap / Filter / KeyBy / Reduce /
    Fold / Aggregations / Window / WindowAll / Union / Window join / Split / Select / Project 等,
    操作很多,可以将数据转换计算成你想要的数据。
    hello-demo
    注【1】

4.flink在我们测试环境上集成的demo

    1:登陆异地监控 (讲清楚架构关系)
    2:代理树
    

5.flink怎么发布?web操作界面简单介绍。

打jar包,设置参数(并发度,main函数等),上传


注:
【1】
map就是做一些映射,比如我们把两个字符串合并成一个字符串,把一个字符串拆成两个或者三个字符串。
flatMap类似于把一个记录拆分成两条、三条、甚至是四条记录,例如把一个字符串分割成一个字符数组。
Filter就类似于过滤。
keyBy就等效于SQL里的group by。
aggregate是一个聚合操作,如计数、求和、求平均等。
reduce就类似于MapReduce里的reduce。
join操作就有点类似于我们数据库里面的join。
connect实现把两个流连成一个流。
repartition是一个重新分区操作(还没研究)。
project操作就类似于SQL里面的snacks(还没研究)

【以上涉及到的代码,我已经上传到github上面:https://github.com/iamcrawler...】

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/74910.html

相关文章

  • flink学习系列--基础知识学习(四)

    摘要:前言这一讲将介绍一下序列化机制和过程函数。然而由于的类型擦除,自动提取并不是总是有效。开发者在自定义类上使用注解,随后创建相应的并覆盖方法。 前言 这一讲将介绍一下序列化机制和过程函数(processfunction)。 序列化机制 使用 Flink 编写处理逻辑时,新手总是容易被林林总总的概念所混淆: 为什么 Flink 有那么多的类型声明方式? BasicTypeInfo.ST...

    piglei 评论0 收藏0
  • Flink 全网最全资源(视频、博客、PPT、入门、实战、源码解析、问答等持续更新)

    摘要:由于配置流是从关系型数据库中读取,速度较慢,导致实时数据流流入数据的时候,配置信息还未发送,这样会导致有些实时数据读取不到配置信息。从数据库中解析出来,再去统计近两周占比。 showImg(https://segmentfault.com/img/remote/1460000019367651); Flink 学习项目代码 https://github.com/zhisheng17/f...

    Dr_Noooo 评论0 收藏0
  • 取之开源,用之开源——深度剖析阿里巴巴对Apache Flink的优化与改进

    摘要:基于在阿里巴巴搭建的平台于年正式上线,并从阿里巴巴的搜索和推荐这两大场景开始实现。在经过一番调研之后,阿里巴巴实时计算认为是一个非常适合的选择。接下来,我们聊聊阿里巴巴在层对又大刀阔斧地进行了哪些改进。 Apache Flink 概述 Apache Flink(以下简称Flink)是诞生于欧洲的一个大数据研究项目,原名StratoSphere。该项目是柏林工业大学的一个研究性项目,早期...

    YJNldm 评论0 收藏0

发表评论

0条评论

Warren

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<