资讯专栏INFORMATION COLUMN

Flink源码分析-生成水印(Watermark)

IT那活儿 / 2070人阅读
Flink源码分析-生成水印(Watermark)
点击上方蓝字关注我们


背景


之前项目一直用的Flink-1.72版本,大多数用的流api进行开发的需求,现在扫描漏洞的时候必须升级到Flink-1.12.0或Flink-1.11.3,所以直接升级到Flink-1.12.0,发现之前用的api(assignTimestampsAndWatermarks)被设置为废弃了。

先来看看项目之前用的:

后来查资料发现Flink在1.11版本中为了实现水印的通用以及方便,对水印进行了重构。


新的水印生成接口


新版本的Flink在类classDataStream中提供了一个新的构造水印assignTimestampsAndWatermarks方法,新的接口需要传入一个WatermarkStrategy对象。

WatermarkStrategy接口继承了接口TimestampAssignerSupplier以及接口WatermarkGeneratorSupplier,上面两个接口都是支持函数式编程的。

先看一下interfaceTimestampAssignerSupplier这个接口提供的方法。

是创建一个TimestampAssigner类型的方法。那这个TimestampAssigner的在水印生成过程中起到什么作用了。先看下这个接口的定义

有一个longextractTimestamp方法,作用是从Flink消费的记录中抽取时间,既可以理解为我们如果要通过业务时间进行统计时,需要通过该方法对来提取记录的业务时间。所以用到业务时间的话,一定要根据自己的业务场景对该方法进行具体的实现。否则Flink会提供一个默认的实现RecordTimestampAssigner<>()

而默认实现的内容也十分简单,一起看一下,必须是记录中已经注册了时间属性。

接下来interfaceWatermarkGeneratorSupplier这个接口。

是返回一个WatermarkGenerator类型的方法,继续看下interfaceWatermarkGenerator做了哪些操作

提供了两个水印发送的方式,接下来对这两个方式进行说明:

onEvent每条记录进来都会调用一次这个方法,入参有3个,第一个是记录,第二个是记录携带的时间,如果注册了时间就会有,第三个参数时水印发射器WatermarkOutputoutput,可以通过这个参数对水印进行发射,用户可以根据自己的业务场景来编写自己的水印生成以及发射逻辑。该方法的重点是每条记录都会调用.

onPeriodicEmit: 该方法是Flink提供的一个定时器方法,每隔一段时间会调用此方法,入参是WatermarkOutputoutput,用户可以通过这个方法每隔一段时间发送一次水印,当记录数过多时,每条记录都发送一次水印明显不合适,也影响性能,此时可以通过这个方法进行水印的定时发送,而onEvent只记录当前水印而选择不发射出去。该方法的参数配置为env.getConfig().setAutoWatermarkInterval(300L),入参是毫秒数,表示隔多少毫秒向下游算子发送一次水印。

而WatermarkStrategy中也提供了一些常用的WatermarkGenerator供用户使用,比如

BoundedOutOfOrdernessWatermarks类中就是一个在onEvent中记录水印,通过onPeriodicEmit方法定时向下游发送水印的实现,构造参数maxOutOfOrderness是提供给记录乱序的,运行最大延迟间隔。MaxTimestamp是当前的水印记录。BoundedOutOfOrdernessWatermarks的大致实现如下

使用方法也十分的简单,提供的是一个静态方法,只需直接调用即可

WatermarkStrategy.>forBoundedOutOfOrderness(Duration.ofSeconds(1))


使用水印


最后结合项目的需求将原来的使用水印的地方改成如下了

类图及FLINK水印算子简要流程

先上类图,方便理解

接着简单介绍下流程

首先TimestampsAndWatermarksOperator算子会在open方法中初始化用户定义的水印逻辑及方式,并且如果需要定时发送水印会,注册一个定时器触发水印定时发送。

当元素到达算子后会调用processElement(StreamRecordelement)

方法很简单,如果元素已经被注册了时间,就直接获取时间,或者设置为LONG.MIN_VALUE,然后根据用户定义的timestampAssigner.extractTimestamp从记录中抽取时间属性,然后再将时间写入元素中,最后调用用户定义的watermarkGenerator.onEvent方法,根据用户的逻辑选择刷新水印以及是否发射水印。

上面初始化中提到了,如果需要定时发送水印,则会注册一个定时器,而定时器的方法如下

通过onProcessingTime来触发定时器的内容,而内容也十分简单,先调用用户定义的watermarkGenerator.onPeriodicEmit方法发送水印,然后获取当前时间,最后注册当前时间加水印定时发送间隔的定时触发器,等待下次触发该方法。


参考资料

https://zhuanlan.zhihu.com/p/158951593

https://blog.csdn.net/zhaoyuqiang/article/details/107453466


END




文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/130018.html

相关文章

  • Flink 灵魂两百问,这谁顶得住?

    摘要:由于配置流是从关系型数据库中读取,速度较慢,导致实时数据流流入数据的时候,配置信息还未发送,这样会导致有些实时数据读取不到配置信息。从数据库中解析出来,再去统计近两周占比。 Flink 学习 https://github.com/zhisheng17/flink-learning 麻烦路过的各位亲给这个项目点个 star,太不易了,写了这么多,算是对我坚持下来的一种鼓励吧! showI...

    Guakin_Huang 评论0 收藏0
  • Flink实战(七) - Time & Windows编程

    摘要:在这种情况下,清除仅指窗口中的数据元,而不是窗口元数据。紫色圆圈表示流的数据元,这些数据元由某个键在这种情况下是用户,用户和用户划分。 0 相关源码 掌握Flink中三种常用的Time处理方式,掌握Flink中滚动窗口以及滑动窗口的使用,了解Flink中的watermark。 Flink 在流处理工程中支持不同的时间概念。 1 处理时间(Processing time) 执行相应算子...

    Meils 评论0 收藏0
  • Flink 从0到1学习—— 分享四本 Flink 国外的书和二十多篇 Paper 论文

    摘要:另外,将机制发扬光大,对有着非常好的支持。系统也注意到并讨论了和的问题。总结本文分享了四本相关的书籍和一份领域相关的论文列表篇,涉及的设计,实现,故障恢复,弹性扩展等各方面。 前言 之前也分享了不少自己的文章,但是对于 Flink 来说,还是有不少新入门的朋友,这里给大家分享点 Flink 相关的资料(国外数据 pdf 和流处理相关的 Paper),期望可以帮你更好的理解 Flink。...

    jollywing 评论0 收藏0
  • 《从0到1学习Flink》—— Flink 中几种 Time 详解

    摘要:每小时窗口将包括在系统时钟指示整个小时之间到达特定操作的所有事件。平行流中的水印水印是在源函数处生成的,或直接在源函数之后生成的。源函数的每个并行子任务通常独立生成其水印。由于其输入流更新其事件时间,因此操作员也是如此。 showImg(https://segmentfault.com/img/remote/1460000017877320?w=1280&h=857); 前言 Flin...

    zsy888 评论0 收藏0
  • Flink 全网最全资源(视频、博客、PPT、入门、实战、源码解析、问答等持续更新)

    摘要:由于配置流是从关系型数据库中读取,速度较慢,导致实时数据流流入数据的时候,配置信息还未发送,这样会导致有些实时数据读取不到配置信息。从数据库中解析出来,再去统计近两周占比。 showImg(https://segmentfault.com/img/remote/1460000019367651); Flink 学习项目代码 https://github.com/zhisheng17/f...

    Dr_Noooo 评论0 收藏0

发表评论

0条评论

IT那活儿

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<