资讯专栏INFORMATION COLUMN

spark Dstreams-kafka数据源

IT那活儿 / 431人阅读
spark Dstreams-kafka数据源

点击上方“IT那活儿”,关注后了解更多内容,不管IT什么活儿,干就完了!!!


01


简   介


Spark Streaming+Kafka集成在实际应用中是非常常见的,其中kafka需要是0.10.0版本及以上。Kafka 0.10的Spark Streaming集成提供了简单的并行性、Kafka分区和Spark分区之间的1:1对应关系以及对偏移量和元数据的访
但是,由于较新的集成使用了新的Kafka consumer API而不是简单的API,因此在使用上存在显著差异。


02


案例及说明


首先需要添加依赖:
Stream中的每条记录是一个ConsumerRecord实体。如果Spark batch持续时间大于默认的Kafka心跳会话超时(30秒),请适当增加heartbeat.interval.ms和session.timeout.ms。
于大于5分钟的批处理,这将需要在代理上更改group.max.session.timeout.ms。
新的Kafka消费API将把消息预取到缓冲区中。因此,出于性能原因,Spark integration将缓存的使用者保留在执行器上(而不是为每个批处理重新创建它们),并且更愿意在具有适当使用者的主机位置上调度分区,这一点很重要。
在大多数情况下,您应该使用LocationStrategies.PreferConsistent,如上所示。这将在可用的执行器之间均匀地分配分区。如果您的执行者与您的Kafka代理位于相同的主机上,请使用PreferBrokers,这将更倾向于在Kafka leader上为该分区安排分区。
最后,如果分区之间的负载有明显的偏差,请使用PreferFixed。这允许您指定分区到主机的显式映射(任何未指定的分区都将使用一致的位置)。
消费者的缓存的默认最大大小为64.如果您希望处理超过(64 *个执行程序数)Kafka分区,则可以通过spark.streaming.kafka.consumer.cache.maxCapacity更改此设置。
如果要禁用Kafka使用者的缓存,可以将spark.streaming.Kafka.consumer.cache.enabled设置为false。
缓存由topicpartition和group.id设置密钥,因此对createDirectStream的每次调用使用多带带的group.id。
新的Kafka consumer API有许多不同的方法来指定主题,其中一些方法需要大量的对象实例化后设置。ConsumerStrategies提供了一个抽象,允许Spark即使在从检查点重新启动后也能获得正确配置的使用者。
如上所示,Subscribe允许您订阅固定的主题集合。SubscribePattern允许您使用正则表达式指定感兴趣的主题。请注意,与0.8集成不同,使用Subscribe或SubscribePattern应该响应在运行流期间添加分区。最后,Assign允许您指定一个固定的分区集合这三种策略都有重载构造函数,允许您指定特定分区的起始偏移量。
如果你有一个更适合批处理的用例,那么可以创建RDD来定义偏移范围:
获取偏移量:
请注意,只有在createDirectStream结果上调用的第一个方法中,而不是在随后的方法链中,才能成功地将类型转换为HasOffsetRanges。
请注意,RDD分区和Kafka分区之间的一对一映射在任何洗牌或重新分区的方法(例如reduceByKey()或window())之后都不会保留。


03


偏移量管理


kafka在失败情况下传输语义取决于偏移量的存储方式和存储时间,spark输出操作至少一次,如果你想只有一次输出,则必须在幂等输出后存储偏移量,或者在原子事务中与输出一起存储偏移量,通过这种集成,为了提高可靠性,您有三个选项来存储偏移量。
1) Checkpoint
如果启用checkpointing,偏移量将存储在检查点中。这很容易实现,但也有缺点。您的输出操作必须是幂等的,因为您将得到重复的输出;交易不是一种选择。此外,如果应用程序代码已更改,则无法从检查点恢复。对于计划的升级,您可以通过在旧代码的同时运行新代码来缓解这一问题(因为输出无论如何都需要是幂等的,所以它们不应该冲突)。但对于需要更改代码的计划外故障,您将丢失数据,除非您有另一种方法来识别已知良好的起始偏移量。
2)kafka自己管理偏移量
Kafka有一个特殊的topic用来存储偏移量,默认情况下,消费者会自动定期提交偏移量,但是这肯定不是你想要的,因为轮询期间消息可能还未输出,这就是上面的流示例将“enable.auto.commit”设置为false的原因,但是在知道输出已存储后,可以手动将偏移提交到Kafka,与检查点相比,Kafka的好处在于无论应用程序代码如何更改,它都是一个持久的存储。然而,卡夫卡不是事务性的,所以您的输出仍然必须是幂等的。
3)自定义存储
对于支持事务的数据存储,将偏移保存在与结果相同的事务中可以使两者保持同步,即使在失败的情况下也是如此。
如果在检测重复或跳过的偏移量范围时非常小心,则回滚事务可防止重复或丢失的消息影响结果。这给出了精确一次语义的等价物,甚至对于聚合产生的输出也可以使用这种策略,聚合通常很难使其成为幂等的。



本文作者:潘宗昊

本文来源:IT那活儿(上海新炬王翦团队)

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/129613.html

相关文章

  • Spark 快速入门

    摘要:数据科学任务主要是数据分析领域,数据科学家要负责分析数据并建模,具备统计预测建模机器学习等方面的经验,以及一定的使用或语言进行编程的能力。监控运行时性能指标信息。 Spark Spark 背景 什么是 Spark 官网:http://spark.apache.org Spark是一种快速、通用、可扩展的大数据分析引擎,2009年诞生于加州大学伯克利分校AMPLab,2010年开源,20...

    wangshijun 评论0 收藏0
  • Spark SQL知识点与实战

    摘要:是最新的查询起始点,实质上是和的组合,所以在和上可用的在上同样是可以使用的。转换为转换为其实就是对的封装,所以可以直接获取内部的注意此时得到的存储类型为是具有强类型的数据集合,需要提供对应的类型信息。Spark SQL概述1、什么是Spark SQLSpark SQL是Spark用于结构化数据(structured data)处理的Spark模块。与基本的Spark RDD API不同,Sp...

    番茄西红柿 评论0 收藏2637

发表评论

0条评论

IT那活儿

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<