资讯专栏INFORMATION COLUMN

Apache Beam采坑系列——KafkaIO

iliyaku / 883人阅读

摘要:最近在用做流上的异常检测,期间遇到了很多问题,但是发现网上相关的资料很少,基本只能自己啃文档和瞎尝试。其中如有错漏,欢迎指出。即从一条数据中获得时间戳,然后以的格式返回。丢弃掉中的附加信息使用这一设置时,得到的中的元素是的和组成的键值对。

最近在用Apache beam做流上的异常检测,期间遇到了很多问题,但是发现网上相关的资料很少,基本只能自己啃文档和瞎尝试。
所以想把自己踩过的坑记录下来,希望能对大家有所帮助。
其中如有错漏,欢迎指出。

KafkaIO

顾名思义,是从kafka上读取数据到beam上或者将beam上的数据写入到kafka中。官方文档中没有直接的教程,要在GitHub上的源码中找到相关使用说明。
Github上的Kafka源码

这里仅说明读数据部分。
maven依赖示例


    org.apache.beam
    beam-sdks-java-io-kafka
    ...

读数据示例

PCollection> lines = //这里kV后说明kafka中的key和value均为String类型
                p.apply(KafkaIO.read()
                .withBootstrapServers("hadoop1:9092, hadoop2:9092")//必需,设置kafka的服务器地址和端口
                .withTopic("mytopic")//必需,设置要读取的kafka的topic名称
                .withKeyDeserializer(StringDeserializer.class)//必需
                .withValueDeserializer(StringDeserializer.class)//必需
                .withMaxNumRecords(301)
                .withTimestampFn(new MyTimestampFunction())
                .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", "earliest"))
                .withoutMetadata()
        )

以下分别后面非必需的一些设置

1.设置最大记录条数

.withMaxNumRecords(301)

通过这个函数,可以设置最大读取的记录条数。

2.设置PCollection中元素对应的时间戳

.withTimestampFn(new MyTimestampFunction())

当不进行这个设置的时候,beam会根据当前的系统时间为每个元素分配一个时间戳。
而有的时候,我们希望用kafka的数据中自身带有的时间戳来作为PCollection中元素的时间戳,从而进行后续的窗口操作。这时就需要通过上面的函数来达到这一目的。
其中MyTimestampFunction()是我们自定义的一个函数,其要实现SerializableFunction, Instant>这个接口。
即从一条kafka数据中获得时间戳,然后以Instant(org.joda.time.Instant)的格式返回。

public class MyTimestampFunction implements SerializableFunction, Instant> {

    public Instant apply(KV input){
        String[] temps = input.getValue().split(",");
        DateTime t = new DateTime(Long.valueOf(temps[1]));
        return t.toInstant();
    }
}

3.设置读kafka数据的顺序

updateConsumerProperties(ImmutableMap.of("auto.offset.reset", "earliest"))

KafkaIO默认的数据读取顺序是从最新的数据开始。当我们开发测试的时候,如果没有一个生产者同步向kafka生产数据,那么这里就拿不到数据。(在这坑了很久,才发现这个原因...)
当我们想实现类似于kafka shell中的--from-beginning的功能的时候,即从最早的数据开始读,就需要进行这一设置。
这里不仅可以改变读取数据的顺序,按照类似的方式,还可以进行其他设置。

4.丢弃掉kafka中的附加信息

.withoutMetadata()

使用这一设置时,得到的PCollection中的元素是kafka的key和value组成的键值对。
当不使用其时,得到的PCollection中的元素是KafkaRecord。会附件很多元数据。

5.其他设置

// custom function for watermark (default is record timestamp)
 *       .withWatermarkFn(new MyWatermarkFunction())
 *
 *       // restrict reader to committed messages on Kafka (see method documentation).
 *       .withReadCommitted()
 *

在源码的使用说明中还提到另外的两个设置,但因为暂时没用到,这里就暂且省略了。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/35904.html

相关文章

  • Apache Beam访问HDFS

    摘要:一直接访问引入的相关包使用代替给指定配置与访问本地文件一样访问文件实际测试中发现本地如能够成功读写,但是集群模式下如读写失败,原因未知。二通过访问除了直接读写的数据,还可以通过来进行读写。 一、直接访问 1.引入HDFS的相关jar包: org.apache.beam beam-sdks-java-io-hadoop-file-system 2.1.0...

    UCloud 评论0 收藏0
  • Apache Beam学习笔记——几种常见的处理类Transform

    摘要:要说在中常见的函数是哪一个,当然是。是一个实现了接口的抽象类,其中是数据处理方法,强制子类必须实现。以上为学习一天的总结,有错误欢迎指正。相同的是这个方法处理的都是中的一个元素。 在阅读本文前,可先看一下官方的WordCount代码, 对Apache Beam有大概的了解。 要说在Apache Beam中常见的函数是哪一个,当然是apply()。常见的写法如下: [Final Outp...

    Chiclaim 评论0 收藏0
  • Apache Beam的分窗与触发器

    摘要:需要注意的是和方法生成的触发器是连续的而不是一次性的。其他的还有一次性触发器将一次性触发器变为连续型触发器,触发后再次等待触发。例如与一起用可以实现每个数据到达后的分钟进行处理,经常用于全局窗口,可以用触发器来设置停止条件。 本文参考Apache Beam官方编程手册 可以结合官方的Mobile Game 代码阅读本文。 在默认情况下,Apache Beam是不分窗的,也就是采用Gl...

    NickZhou 评论0 收藏0
  • Apache beam其他学习记录

    摘要:与用于与的转换。其中方法返回的是在中的位置下标。对于设置了多个触发器的,自动选择最后一个触发的结算结果。其他不是线程安全的,一般建议处理方法是幂等的。 Combine与GroupByKey GroupByKey是把相关key的元素聚合到一起,通常是形成一个Iterable的value,如: cat, [1,5,9] dog, [5,2] and, [1,2,6] Combine是对聚...

    jasperyang 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<