摘要:学习笔记使用一个叫的文学家的名字用来命名的。引入,正式升级为分布式流处理平台。主要还是针对组成员数量减少的情况。当所有成员都退出组后,消费者组状态变更为。自动定期删除过期位移的条件就是,组要处于状态。减少下游系统一次性消费的消息总数。
Kafka 学习笔记
Kafka使用一个叫Franz Kafka的文学家的名字用来命名的。
Kafka是一款开源的消息引擎系统。也是一个分布式流处理平台。
Kafka同时支持点对点模型以及发布/订阅模型。
为什么要使用Kakfa?四个字:削峰填谷!
Kafka 术语Record:消息,指Kafka处理对象
Topic:主题,用来承载消息的容器
Partition:分区,一个有序不变的消息队列,一个主题下可以有多个分区
Offset:消息位移,表示分区中每条信息的位置,是一个单调递增不变的值
Replica,副本,数据冗余。
领导者副本:对外提供服务,与客户端进行交互
追随者副本:不能与外界进行交互,只是被动地追随领导者副本
Producer:生产者,向主题发布新消息的应用程序
Consumer:消费者,向主题订阅新消息的应用程序
Consumer Offset:消费者位移,表示消费者消费进度
Consumer Group:消费者组,多个消费者实例共同组成的一个组,同时消费多个分区来实现高吞吐。
Rebalance:重平衡,消费者组内某个消费者实例挂掉后,其他消费者实例自动重新分配订阅主题分区的过程。它是Kafka消费者端实现高可用的重要手段。
Kafka 种类Apache Kafka: 也称社区版Kafka,迭代速度快,社区响应度高,使用它可以让你有更高的把控度;缺陷在于仅仅提供基础核心组件,缺失一些高级特性
Confluent Kafka: 优势在于集成了很多高级特性且由Kafka原班人马打造,质量保证;缺陷在于国内相关资料不全,普及率较低,没有太多可参考的范例。
CDH/HDP Kafka: 优势在于操作简单,节省运维成本;缺陷在于把控度低,演进速度慢
Kafka 版本号 一个题外话Kafka新版本客户端代码开始完全由java语言编写,于是有些人开始“JAVA VS SCALA”的大讨论。并从语言特性上分析为什么社区摈弃Scala转而投向Java的怀抱。
其实事情没有那么复杂,仅仅是因为社区来了一批Java程序猿,而以前老的scala程序猿隐退了罢了。
版本演进Kafka总共演进了7个大版本
0.7版本: 上古版本,一旦有人向你推荐这个版本,怼他。
0.8版本: 开始引入副本机制,另外老版本需要制定zookeeper地址而不是Broker地址。在0.8.2.0版本社区引入了新版本Producer API,即指定Broker地址的Producer。
0.9版本: 重量级的大版本更迭。增加了基础的安全认证/权限功能,引入了Kafka Connect,新版本Producer API稳定。
0.10.0.0: 里程碑的大版本。该版本又有两个小版本,0.10.1和0.10.2。引入Kafka streams,正式升级为分布式流处理平台。0.10.2.2 新Consumer API稳定。
0.11.0.0: 目前最主流的版本之一。引入两个重量级功能变更:一个是提供幂等性Producer API以及事务 API, 另一个是对Kafka消息格式做了重构。
1.0和2.0: 如果你是Kafka Stream用户,至少选择2.0.0版本吧。
最后还有个建议,不论你使用的是哪个版本,都请尽量保持服务端版本和客户端版本一致,否则你将损失很多Kafka为你提供的性能优化收益。
江湖经验:不要轻易成为新版本的小白鼠。
集群部署磁盘容量举例:
假设公司有个业务需要每天向Kafka集群发送 1 亿条信息。每条消息保存两份来防止数据丢失。消息默认保存两周时间。并假设消息的平均大小是1KB。问你的Kafka集群需要为这个业务预留多少磁盘空间?
总大小:1亿 1KB 2备份 * 14 ~= 2800G
加上Kafka的一些索引数据,为它预留10%,那么总大小变为 2800 * (1 + 10%) ~= 3TB
Kafka支持数据压缩,压缩比0.75的话,那么应该预留的存储空间为2.25TB左右。
带宽举例
与其说是带宽资源的规划,其实真正要规划的是Kafka服务器的数量。
假设公司机房环境1Gbps,现有个业务,需要在1小时内处理1TB的业务数据。
一般单台服务器 规划使用70%的带宽资源的1/3 ~= 240Mbps。
1TB需要1小时处理,则每秒差不多需要处理2336Mbps的数据,除 240Mbps,则差不多需要10台机器。如果消息还需要额外复制的话,那么还要对应乘上备份数。
集群配置参数配置名称 | 示例 | 建议值 |
---|---|---|
log.dirs | /home/kafka1,/home/kafka2 | kafka写日志多路径,不仅能提升写性能,在1.1版本中还能支持故障转移功能。 |
zookeeper.connect | zk1:2181,zk2:2181,zk3:2181/kafka1 | |
listens | listeners=PLAINTEXT://dn1.ambari:6667 | |
auto.create.topics.enable | true | false,不建议可以自动创建主题 |
unclean.leader.election.enable | false | false,如果设置为true有丢数据风险 |
auto.leader.rebalance.enable | false | false,不定期进行leader副本的选举 |
log.retention.hours | 168 | 默认保持7天数据 |
log.retention.bytes | -1 | 保存多少数据都可以 |
message.max.bytes | 1000000 | 默认值建议调大。该值代表Broker能处理的最大消息大小 |
压缩配置
compression.type
压缩算法
总结一下压缩和解压缩,Producer端压缩,Broker端保持,Consumer端解压缩。
无消息丢失最佳实践不要使用producer.send(msg),而要使用producer.send(msg,callback)
设置acks=all,表明所有副本Broker都要接受消息,该消息才算是“已提交”
设置retries>0,表明Producer自动重试,当网络顺断时,防止消息丢失。
设置unclean.leader.election.enable=false
设置replication.factor >=3,增加副本数,保证数据冗余
设置min.insync.replicas > 1,控制的是消息至少要被写入多少个副本才算是 已提交。
确保replication.factor > min.insync.replicas。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。推荐设置replication.factor = min.insync.replicas + 1
确保消息消费完再提交。设置enable.aoto.commit=false
Kafka 拦截器分为生产者拦截器和消费者拦截器。
典型的应用场景可以应用于客户端监控、端到端系统性能测试、消息审计等多种功能在内的场景。
Kafka是如何管理TCP连接的 java生产者是如何管理TCP连接的KafkaProducer实例创建时启动Sender线程,从而创建与bootstrap.servers中所有的Broker的TCP连接。
KafkaProducer实例首次更新元数据信息之后,还会再次创建与集群中所有Broker的TCP连接
如果Producer端发送信息到某台Broker时,发现没有与该Broker的TCP连接,那么也会创建连接
如果设置connections.max.idle.ms > 0,则步骤一中的TCP连接会被自动关闭;如果设置该参数-1,那么步骤一中创建的连接无法被关闭,会成为僵尸进程。
Java消费者是如何管理TCP连接的创建的3个时机
发起FindCoordinator请求时
连接协调者时
消费数据时
消费者程序会创建3类TCP连接
确定协调者和获取集群元数据
连接协调者,令其执行组成员管理操作
执行实际的消息获取
幂等生产者和事务生产者消息交付可靠性保障,常见的承诺有以下三种
最多一次:消息可能会丢失,但绝不会重复发送
至少一次:消息不会丢失,但有可能被重复发送
精确一次:消息不会丢失,也不会被重复发送
Kafka默认是最少一次
要保证精确一次,就需要幂等和事务。不过性能会想对较差。
幂等生产者幂等性有很多好处。其最大的优势在于我们可以安全地重试任何幂等性操作,反正它们不会破坏我们的系统状态。
在0.11.0.0版本引入了幂等生产者,只要更改配置props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true)。
使用幂等生产者要注意
它只能保证单分区的幂等,多分区无法实现
只能实现单会话上的幂等,重启之后幂等消失
事务生产者设置事务型Producer
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true)
设置producer端参数transctional.id。最好为其设置一个有意义的名字
此外代码也要做一些调整变化。
producer.initTransactions(); try { producer.beginTransaction(); producer.send(record1); producer.send(record2); producer.commitTransaction(); } catch (KafkaException e) { producer.abortTransaction(); }重平衡 怎么避免Rebalance
Rebalance发生的时机有三个
组成员数据量发生变化
订阅主题数量发生变化
订阅主题的分区数发生变化
后面两个通常是运维的主动操作,无法避免。主要还是针对组成员数量减少的情况。增加一般也是人为主动的。
那么避免因为参数或逻辑不合理而导致的成员退出,与之相关的主要参数
session.timeout.ms,推荐设置6s
heartbeat.interval.ms,推荐设置2s
max.poll.interval.ms,推荐设置比你的业务逻辑处理要长
GC参数,避免频繁的FULL GC
重平衡通知重平衡过程是通过 消费者端的心跳线程来通知到其他消费者实例的。
0.10.1.0版本之前,发送心跳请求是在消费者主线程完成的,也就是kafkaConsumer.poll方法的那个线程。这样做有诸多弊端,因为消息处理也是在这个线程中完成的。因此当业务逻辑处理消耗了较长时间,心跳请求就无法及时发送到协调者那边了。导致协调者 错误地认为该消费者已经死了。
0.10.1.0版本开始,社区引入了一个多带带的线程来专门执行心跳发送。
消费者组状态机定义了5种状态
各个状态的流转
一个消费者组最开始是Empty状态,当重平衡过程开启后,它会被置为PreparingRebalance状态等待成员加入,之后变更到CompletingRebalance状态等待分配方案,最后流转到Stable状态完成重平衡。
当有新成员或已有成员退出时,消费者组的状态从Stable直接跳到PreparingRebalance状态,此时,所有现存成员就必须重新申请加入组。
当所有成员都退出组后,消费者组状态变更为Empty。
Kafka自动定期删除过期位移的条件就是,组要处于Empty状态。
重平衡流程 消费者端重平衡流程JoinGroup请求
SyncGroup请求
Broker端重平衡场景分析新成员入组
组成员主动离组
组成员崩溃离组
重平衡时协调者对组内成员提交位移的处理
位移提交 CommitFailedException怎么处理?缩短消息处理的时间,该方法优先处理
增加Consumer端允许下游系统消费一批数据的最大时长。设置参数max.poll.interval.ms,新版本默认是5分钟。
减少下游系统一次性消费的消息总数。max.poll.records
下游系统使用多线程来加速消费
多消费者实例鉴于KafkaConsumer不是线程安全的事实,制定两套多线程方案。
每个线程维护专属的KafkaConsumer实例,负责完整的消息获取、消息处理流程
核心代码 ``` public class KafkaConsumerRunner implements Runnable { private final AtomicBoolean closed = new AtomicBoolean(false); private final KafkaConsumer consumer; public void run() { try { consumer.subscribe(Arrays.asList("topic")); while (!closed.get()) { ConsumerRecords records = consumer.poll(Duration.ofMillis(10000)); // 执行消息处理逻辑 } } catch (WakeupException e) { // Ignore exception if closing if (!closed.get()) throw e; } finally { consumer.close(); } } // Shutdown hook which can be called from a separate thread public void shutdown() { closed.set(true); consumer.wakeup(); } ```
消费者程序使用单或多线程获取消息,创建多个消费者线程执行消息处理逻辑
核心代码 ``` private final KafkaConsumerconsumer; private ExecutorService executors; ... private int workerNum = ...; executors = new ThreadPoolExecutor( workerNum, workerNum, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy()); ... while (true) { ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); for (final ConsumerRecord record : records) { executors.submit(new Worker(record)); } } ```
两种方案各有特点。
监控消费进度的3种方法使用Kafka自带命令行工具kafka-consumer-groups脚本
使用Kafka Consumer API
使用Kafka自带的JMX监控指标
Kafka副本详解副本机制的好处:
提供数据冗余
提供高伸缩性
改善数据局部性
但Kafka只有第一种好处,原因是这样的设计,Kafka有两点好处
方便实现 Read-your-writes
指当你用生产者API向Kafka成功写入消息后,马上使用消费者API去读取刚才生产的消息
方便实现单调读(Monotonic Reads)
在多次消费信息时,不会看到该消息一会存在一会不存在的情况。
判断Follower副本与Leader副本是否同步的标准,Broker参数replia.lag.time.max.ms的参数值。Kafka有一个in-sync Replicas(ISR)集合的概念。
Kafka控制器控制器组件(Controller),是Kafka的核心组件,它的主要作用是在Apache Zookeeper的帮助下管理和协调整个Kafka集群。
控制器是怎么被选出来的每台Broker都能充当控制器,在Broker启动时,会尝试去Zookeeper中创建/controller节点。Kafka当前选举规则,第一个成功创建/controller节点的Broker会被指定为控制器。
控制器能做什么?主题管理
分区重分配
Prefered领导者选举
集群成员管理
数据服务,控制器上保存最全的集群元数据信息
控制器保存了什么数据?这些数据其实也在Zookeeper中存储了一份。
控制器的故障转移 总结小窍门分享:当你觉得控制器出现问题时,比如主题无法删除了,重分区hang住了,你可以不用重启broker或者控制器,快速简便的方法,直接去Zookeeper手动删除/controller节点。
这样做的好处是,既可以引发控制器的重选举,又可以避免重启Broker导致的消息中断。
Kafka请求处理 请求方案Kafka方案类似于Reactor模式
那么Kafka类似的方案是这样的。网络线程池默认参数num.network.threads=3
好了,客户端发来的请求会被Aceptor线程分发到任意一个网络线程中,由他们进行处理。你可能会认为,网络线程池是顺序处理不就好了?实际上,Kafka在这个环节上又做了一层异步线程池的处理。
IO线程池执行真正的处理。如果是PRODUCER生产请求,则将消息写入到底层的磁盘日志中;如果是FETCH请求,则从磁盘或页缓存中读取消息。当IO请求处理完请求后,会将生成的响应放入网络线程池的响应队列中,并由对应的网络线程负责将Response反还给客户端。
请求队列是所有网络线程共享的,而响应队列则是每个网络线程专属的。
IO线程池默认参数num.io.threads=8
图中还有一个Purgatory的组件,这是Kafka中著名的“炼狱”组件。
它是用来缓存延时请求的,所谓延时请求,就是那些一时未满足条件的不可立刻处理的请求。
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/75734.html
摘要:作者胡夕人人贷计算平台部总监,将在这篇专栏中一步一步的教你填平这些坑,全面提升你的实战能力搭配掘金小册图解之核心原理学习效果更佳哦送学习笔记 showImg(https://segmentfault.com/img/bVbsg9O?w=258&h=258);关注有课学微信公众号,回复暗号 kafka 获取购买《Kafka核心技术与实战》极客时间专栏地址,购买成功后提交购买截图即可获得返...
阅读 2096·2023-04-26 00:09
阅读 3114·2021-09-26 10:12
阅读 3480·2019-08-30 15:44
阅读 2862·2019-08-30 13:47
阅读 921·2019-08-23 17:56
阅读 3225·2019-08-23 15:31
阅读 474·2019-08-23 13:47
阅读 2508·2019-08-23 11:56