摘要:需要注意的是和方法生成的触发器是连续的而不是一次性的。其他的还有一次性触发器将一次性触发器变为连续型触发器,触发后再次等待触发。例如与一起用可以实现每个数据到达后的分钟进行处理,经常用于全局窗口,可以用触发器来设置停止条件。
本文参考Apache Beam官方编程手册
可以结合官方的Mobile Game 代码阅读本文。
在默认情况下,Apache Beam是不分窗的,也就是采用GlobalWindow,而如果同时也不设置自定义的触发器,那么Beam会在所有数据都收集到之后才开始对数据进行处理。这通常只能适用于有限数据且对实时性要求不高的情况。当输入为无限流数据,我们可以
1)设置合适的窗口大小(根据时间戳),在窗口末端进行数据处理;
2)设置触发器,当条件满足时触发,进行数据处理;
3)同时设置窗口和触发器。
时间戳说明:Beam的数据都是保存在PCollection中。当读入数据时,PCollection为每个元素都自动生成一个内置的时间戳,对于无限输入,数据的时间戳不同。而对于有限输入,由于是同时读入,所有的元素的时间戳都是一样的,这时候分窗是没有意义的(都在一个窗口)。而我们可以手动为每个元素设置时间戳,通常采用数据中已有的时间属性(比如日志中一般都会带有事件时间)。可以在DoFn中为数据带上时间戳,如:
@ProcessElement public void processElement(ProcessContext c) { c.outputWithTimestamp(c.element(), new Instant(XXX)); }窗口类型:
1)全局窗口
就是默认不分窗的情况。
apply(Windows.
2)固定时间大小窗口
最常见的分窗方式,按照时间戳把数据处理窗口分为固定长度。
apply(Windows.
3)滑动窗口
需要设置2个参数,窗口大小和窗口产生周期。窗口之间有重叠,通常用于计算平均数的情况(暂没用过)
4)会话窗口
一般用于相同key数据聚合,同一个key的数据之间时间间隔较大的会被分到不同的窗口。
**
水位线和超时数据:**
当使用用户自定义的时间戳时,先处理的数据并不总是时间戳较小的,有可能出现时间戳小的数据在后面才产生的情况。Beam通常会给窗口设定一个处理期限时间(图中纵轴),当超过这个时间的数据被视为超时数据,而这些期限时间的连线即水位线。
系统会根据实际情况进行预测生成水位线,在默认情况下不对超时数据进行处理,而我们可以通过设置触发器对超时数据进行额外处理。
触发器种类1)时间时间触发器
根据时间戳进行触发。
.triggering(AfterWatermark.pastEndOfWindow()//水位线到达时触发一次 .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane() .plusDelayOf(FIVE_MINUTES))//水位线之前,每次触发后第一个数据来到之后的5分钟时再触发 .withLateFirings(AfterProcessingTime.pastFirstElementInPane() .plusDelayOf(TEN_MINUTES)))//水位线之后,每次触发后第一个数据来到之后的10分钟时再触发
以上分别对水位线上中下的3种数据进行不同的处理。需要注意的是withEarlyFirings和withLateFirings方法生成的触发器是连续的而不是一次性的。
2)处理时间触发器
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(FIVE_MINUTES)
如方法的字面意思,仅在第一个数据到达后的5分钟时触发一次。
3)数据驱动型触发器
AfterPane.elementCountAtleast(XX)
当处理到XX个时触发一次。需要注意的是当数据个数小于XX时永远不会触发数据处理。
4)混合触发器
将多个触发器混合起来,比如1)中的代码就是3个触发器混合。其他的还有
①Repeatedly.forever(一次性触发器)
将一次性触发器变为连续型触发器,触发后再次等待触发。例如与AfterProcessingTime.pastFirstElementInPane().plusDelayOf(FIVE_MINUTES)一起用可以实现每个数据到达后的5分钟进行处理,经常用于全局窗口,可以用orFinally(触发器)来设置停止条件。
②AfterEach.inOrder(触发器1,触发器2...)
当触发器1满足后等待触发器2...知道所有触发器满足后开始数据处理。
③AfterFirst(触发器1,触发器2..)和AfterAll(触发器1,触发器2..)
这2个分别为或,与的逻辑。
④orFinally
见①
Accumulating Mode:
If our trigger is set to .accumulatingFiredPanes, the trigger emits the following values each time it fires. Keep in mind that the trigger fires every time three elements arrive:
First trigger firing: [5, 8, 3] Second trigger firing: [5, 8, 3, 15, 19, 23] Third trigger firing: [5, 8, 3, 15, 19, 23, 9, 13, 10]
Discarding Mode:
If our trigger is set to .discardingFiredPanes, the trigger emits the following values on each firing:
First trigger firing: [5, 8, 3] Second trigger firing: [15, 19, 23] Third trigger firing: [9, 13, 10]超时数据处理
.withAllowedLateness(Duration.XXXX(XXX))
可设置允许超时多长时间的数据。
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/67679.html
摘要:与用于与的转换。其中方法返回的是在中的位置下标。对于设置了多个触发器的,自动选择最后一个触发的结算结果。其他不是线程安全的,一般建议处理方法是幂等的。 Combine与GroupByKey GroupByKey是把相关key的元素聚合到一起,通常是形成一个Iterable的value,如: cat, [1,5,9] dog, [5,2] and, [1,2,6] Combine是对聚...
摘要:最近在用做流上的异常检测,期间遇到了很多问题,但是发现网上相关的资料很少,基本只能自己啃文档和瞎尝试。其中如有错漏,欢迎指出。即从一条数据中获得时间戳,然后以的格式返回。丢弃掉中的附加信息使用这一设置时,得到的中的元素是的和组成的键值对。 最近在用Apache beam做流上的异常检测,期间遇到了很多问题,但是发现网上相关的资料很少,基本只能自己啃文档和瞎尝试。所以想把自己踩过的坑记录...
摘要:一直接访问引入的相关包使用代替给指定配置与访问本地文件一样访问文件实际测试中发现本地如能够成功读写,但是集群模式下如读写失败,原因未知。二通过访问除了直接读写的数据,还可以通过来进行读写。 一、直接访问 1.引入HDFS的相关jar包: org.apache.beam beam-sdks-java-io-hadoop-file-system 2.1.0...
摘要:要说在中常见的函数是哪一个,当然是。是一个实现了接口的抽象类,其中是数据处理方法,强制子类必须实现。以上为学习一天的总结,有错误欢迎指正。相同的是这个方法处理的都是中的一个元素。 在阅读本文前,可先看一下官方的WordCount代码, 对Apache Beam有大概的了解。 要说在Apache Beam中常见的函数是哪一个,当然是apply()。常见的写法如下: [Final Outp...
摘要:主页暂时下线社区暂时下线知识库自媒体平台微博知乎简书博客园我们不是的官方组织机构团体,只是技术栈以及的爱好者合作侵权,请联系请抄送一份到基础编程思想和大数据中文文档中文文档中文文档中文文档中文文档中文文档中文文档中文文档中文文档中文文档区块 【主页】 apachecn.org 【Github】@ApacheCN 暂时下线: 社区 暂时下线: cwiki 知识库 自媒体平台 ...
阅读 1135·2021-11-11 16:54
阅读 835·2021-10-19 11:44
阅读 1304·2021-09-22 15:18
阅读 2396·2019-08-29 16:26
阅读 2914·2019-08-29 13:57
阅读 3065·2019-08-26 13:32
阅读 1055·2019-08-26 11:58
阅读 2294·2019-08-26 10:37