摘要:通过增加分区数量,能够通过部署多个消费者增加并行消费能力。然后使用了饱和策略,使得多线程处理不过来的时候,能够阻塞在的消费线程上。多线程是为了增加效率,等是为了增加可靠性。
前提:本例适合那些没有顺序要求的消息主题。
kafka通过一系列优化,写入和读取速度能够达到数万条/秒。通过增加分区数量,能够通过部署多个消费者增加并行消费能力。但还是有很多情况下,某些业务的执行速度实在是太慢,这个时候我们就要用到多线程去消费,提高应用机器的利用率,而不是一味的给kafka增加压力。
使用Spring创建一个kafka消费者是非常简单的。我们选择的方式是继承kafka的ShutdownableThread,然后实现它的doWork方法即可。
参考:https://github.com/apache/kaf...
多线程消费某个分区的数据即然是使用多线程,我们就需要新建一个线程池。
我们创建了一个最大容量为20的线程池,其中有两个参数需要注意一下。(参考《JAVA多线程使用场景和注意事项简版》)。
我们使用了了零容量的SynchronousQueue,一进一出,避免队列里缓冲数据,这样在系统异常关闭时,就能排除因为阻塞队列丢消息的可能。
然后使用了CallerRunsPolicy饱和策略,使得多线程处理不过来的时候,能够阻塞在kafka的消费线程上。
然后,我们将真正处理业务的逻辑放在任务中多线程执行,每次执行完毕,我们都手工的commit一次ack,表明这条消息我已经处理了。由于是线程池认领了这些任务,顺序性是无法保证的,可能有些任务没有执行完毕,后面的任务就已经把它的offset给提交了。o.O
不过这暂时不重要,首先让它并行化运行就好。
可惜的是,当我们运行程序,直接抛出了异常,无法进行下去。
程序直接说了:
KafkaConsumer is not safe for multi-threaded access
显然,kafka的消费端不是线程安全的,它拒绝你这么调用它的api。kafka的初衷是好的,想要避免一些并发环境的问题,但我确实需要使用多线程处理。
kafka消费者通过比较调用者的线程id来判断是否是由外部线程发起请求。
long threadId = Thread.currentThread().getId(); if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId)) throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access"); refcount.incrementAndGet();
}
得,只能将commitSync函数放在线程外面了,先提交ack、再执行任务。
加入管道我们获取的消息,可能在真正被执行之前,会进行一些过滤,比如一些空值或者特定条件的判断。虽然可以直接放在消费者线程里运行,但显的特别的乱,可以加入一个生产者消费者模型(你可以认为这是画蛇添足)。这里采用的是阻塞队列依然是SynchronousQueue,它充当了管道的功能。
我们把任务放入管道后,立马commit。如果线程池已经满了,将一直阻塞在消费者线程里,直到有空缺。然后,我们多带带启动了一个线程,用来接收这些数据,然后提交到这部分的代码看起来大概这样。
应用能够启动了,消费速度贼快。
参数配置kafka的参数非常的多,我们比较关心的有以下几个参数。
max.poll.records调用一次poll,返回的最大条数。这个值设置的大,那么处理的就慢,很容易超出max.poll.interval.ms的值(默认5分钟),造成消费者的离线。在耗时非常大的消费中,是需要特别注意的。
enable.auto.commit是否开启自动提交(offset)如果开启,consumer已经消费的offset信息将会间歇性的提交到kafka中(持久保存)
当开启offset自动提交时,提交请求的时间频率由参数`
auto.commit.interval.ms`控制。
如果broker端反馈的数据量不足时(fetch.min.bytes),fetch请求等待的最长时间。如果数据量满足需要,则立即返回。
session.timeout.msconsumer会话超时时长,如果在此时间内,server尚未接收到consumer任何请求(包括心跳检测),那么server将会判定此consumer离线。此值越大,server等待consumer失效、rebalance时间就越长。
heartbeat.interval.msconsumer协调器与kafka集群之间,心跳检测的时间间隔。kafka集群通过心跳判断consumer会话的活性,以判断consumer是否在线,如果离线则会把此consumer注册的partition分配(assign)给相同group的其他consumer。此值必须小于“session.timeout.ms”,即会话过期时间应该比心跳检测间隔要大,通常为session.timeout.ms的三分之一,否则心跳检测就失去意义。
在本例中,我们的参数简单的设置如下,主要调整了每次获取的条数和检测时间。其他的都是默认。
仔细的同学可能会看到,我们的代码依然不是完全安全的。这是由于我们提前提交了ack导致的。程序正常运行下,这无伤大雅。但在应用异常关闭的时候,那些正在执行中的消息,很可能会丢失,对于一致性要求非常高的应用,我们要从两个手段上进行保证。
使用关闭钩子第一种就是考虑kill -15的情况。这种方式比较简单,只要覆盖ShutdownableThread的shutdown方法即可,应用将有机会执行线程池中的任务,确保消费完毕再关闭应用。
@Override public void shutdown() { super.shutdown(); executor.shutdown(); }使用日志处理
应用oom,或者直接kill -9了,事情就变得麻烦起来。
维护一个多带带的日志文件(或者本地db),在commit之前写入一条日志,然后在真正执行完毕之后写入一条对应的日志。当系统启动时,读取这些日志文件,获取没有执行成功的任务,重新执行。
想要效率,还想要可靠,是得下点苦力气的。
借助redis处理这种方式与日志方式类似,但由于redis的效率很高(可达数万),而且方便,是优于日志方式的。
可以使用Hash结构,提交任务的同时写入Redis,任务执行完毕删掉这个值,那么剩下的就是出现问题的消息。
在系统启动时,首先检测一下redis中是否有异常数据。如果有,首先处理这些数据,然后正常消费。
多线程是为了增加效率,redis等是为了增加可靠性。业务代码是非常好编写的,搞懂了逻辑就搞定了大部分;业务代码有时候又是困难的,你要编写大量辅助功能增加它的效率、照顾它的边界。
以程序员的角度来说,最有竞争力的代码都是为了照顾小概率发生的边界异常。
kafka在吞吐量和可靠性方面,有各种的权衡,很多都是鱼和熊掌的关系。不必纠结于它本身,我们可以借助外部的工具,获取更大的收益。在这种情况下,redis当机与应用同时当机的概率还是比较小的。5个9的消息保证是可以做到的,剩下的那点不完美问题消息,你为什么不从日志里找呢?
扩展阅读:
1、JAVA多线程使用场景和注意事项简版
2、Kafka基础知识索引
3、360度测试:KAFKA会丢数据么?其高可用是否满足需求?
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/73894.html
摘要:只有两个基础组件同时死亡,才会受到严重影响。的意外死亡,造成生产端发送失败。后台会有一个线程进行这些失败消息的遍历和重新投递。二阻塞业务正常进行。死亡,或者单独死亡,消息最终都会被发出,仅当与同时死亡,消息才会发送失败,并记录在日志文件里。 本工具的核心思想就是:赌。只有两个基础组件同时死亡,才会受到严重影响。哦,断电除外。 mq是个好东西,我们都在用。这也决定了mq应该是高高高可用的...
摘要:相关概念协议高级消息队列协议是一个标准开放的应用层的消息中间件协议。可以用命令与不同,不是线程安全的。手动提交执行相关逻辑提交注意点将写成单例模式,有助于减少端占用的资源。自身是线程安全的类,只要封装得当就能最恰当的发挥好的作用。 本文使用的Kafka版本0.11 先思考些问题: 我想分析一下用户行为(pageviews),以便我能设计出更好的广告位 我想对用户的搜索关键词进行统计,...
阅读 3461·2023-04-26 00:16
阅读 1339·2021-11-25 09:43
阅读 3785·2021-11-23 09:51
阅读 2943·2021-09-24 09:55
阅读 694·2021-09-22 15:45
阅读 1365·2021-07-30 15:30
阅读 3022·2019-08-30 14:04
阅读 2213·2019-08-26 13:46