Flink具备高吞吐、低延迟、纯流式架构、支持对乱序事件的处理、有状态、高度灵活的窗口定制、失败恢复、故障转移、水平扩展、批处理、流处理统一的API等大数据处理优势。基于大数据的应用场景中,从数据生产,到数据收集、数据处理、数据应用,贯穿整个大数据生命周期全栈的每个环节,Flink 均可应用其中。作为新一代开源大数据计算引擎,Flink 不仅满足海量数据对实时性的需求,且能够全链路打通端到端的数据价值挖掘。
基于开源组件的架构如果能实现性能最优化,那就是高潮迭起,掌声不断,如果架构性能不能最优化,那就是爷爷赶牛车,急死孙子。
笔者所在项目的日志综管平台架构使用了Flink组件,遇到了实时计算延迟的性能问题,下面笔者将和团队一起解决该性能问题的过程分享如下。
运行模式 | Flink on yarn |
Fink | 1.7.2 |
Hadoop | 2.8.5 |
计算节点数 | 9台虚拟机(单台cpu:12核内存:64GB 硬盘:550G) |
kafka | 2.1 |
业务高峰期处理数据量(每分钟) | 860w |
生成指标 | 30个 |
跑的任务数 | 8个 |
如图所示,flink处理的业务链数据量从某一时间点突然出现断崖式下降,数据处理积压越来严重,flink 任务背压较高,同时指标出现延时生成现象(正常处理延时1分以内)。
首先通过查看flink业务链处理日志,发现疑似线索。日志显示任务连接上游kafka报Disconnection连接异常现象。当指标延时时,此错误信息报频率较高,但指标不延时偶尔也会报错,是否这就是导致问题的罪魁祸首?根据这一线索,继续刨根问底:
分析及措施:
上游kafka采用kerberos认证机制,每隔24小时需要重新认证(调用专有客户端进行认证),flink 9台计算节点上部署自动认证脚本,每隔10分钟程序自动认证,Disconnection连接异常现象出现频率减少,但指标延时情况还在存在。
调整flink 连接kafka消费topic参数
default.api.timeout.ms
session.timeout.ms
request.timeout.ms
fetch.max.wait.ms
fetch.min.bytes
调整连接参数后Disconnection连接异常现象未出现,但指标延时现象依然存在。
通过监测上游kafka topic 消费分组Lag值,发现是下游消费滞后造成数据积压现象。
分析结论:通过以上监测与优化措施,指标生成延迟问题仍未解决,排除由Kafka引起指标延时的可能性。
通过上述优化整改,Flink与kafka连接异常问题解决,但延迟的问题还是存在,革命尚未成功,吾辈仍需继续深入分析。经比对多天日志,发现每次任务重启前都有checkpoint失败,ClosedByInterruptException异常现象
分析及措施:
因为业务链业务量巨大(高峰期每分钟需处理的数据量达800万左右),在有限flink计算节点上(9台虚拟机),按照要求需要满足几十个指标在1分钟内不出现延时生成。当任务重启后如果从历史检查点恢复处理消费数据,数据量积压概率较高,无法保障指标生成不延时。所以,重启处理机制更改为每次任务重启后从当前时间点消费kafka 数据,而非从检查点开始。
关闭checkpoint后,无对应异常日志,但指标生成延迟问题依然存在。
分析结论:虽然对该可疑目标进行了tunning,但延迟依旧存在,进一步排除了checkpoint失败导致指标延时的可能性。
排除以上两种情况后,继续对flink组件本身的运行状态做全面综合深入分析。
分析及措施:
加大并发数处理:业务链kafka topic 是100 partition,正常下游Flink需要开100 个并发与partition个数一一对应起来,如此配置性能才能最优。但当前flink集群资源有限(flink集群机器上跑了其它96个并发任务),无法开启100 个并发(经测试最大可开启72 个并发)。按可用最大并发配置后,计算节点cpu 负载30%以下,但指标仍出现延时,看来扩大并发数解决不了延时问题。
线程运行状况
通过分析程序运行状态,发现shsncSpanEventStream pre -> Timestamps/Watermarks 这段逻辑有线程锁现象:
"AsyncIO-Emitter-Thread (trans stream -> Process -> shsncSpanEventStream pre -> Timestamps/Watermarks (28/72))" #181 daemon prio=5 os_prio=0 tid=0x00007f4da3cf8000 nid=0x1324c waiting for monitor entry [0x00007f4d8bdeb000]
java.lang.Thread.State: BLOCKED (on object monitor)at org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:125)
- waiting to lock <0x0000000610a5b028> (a java.lang.Object)at org.apache.flink.streaming.api.operators.async.Emitter.run(Emitter.java:85)
at java.lang.Thread.run(Thread.java:748)
背压运行状况
从任务背压图上看,处理延时是堵在入口解析及下游水位处理环节点逻辑上:
优化措施:
flink共享waterMaker机制,在数据源进行waterMaker注册,减少逻辑处理N倍;
对应用吐过来业务数据SPAN和SPANEVENT进行分流处理,提高程序处理速度;
增加过滤数据逻辑,过滤掉无需做指标计算的数据,减少程序数据处理量。
业务链flink任务入redis/es拆分出来做多带带算子进行入库。
任务并发数,调整为50个并发数,消费kafka topic(topic 100 partition)
实施以上优化措施后,问题依旧,延时并没有得到缓解和解决。由于在上一步为了排除checkpoint原因,关闭了checkpoint,关闭check后虽然没有解决延时问题,但是由于关闭了checkpoint程序不会因为checkpoint失败而停止,因此得以观察延时情况下程序gc和堆栈具体使用情况。
gc分析:指标延迟时,通过jstat 监测发现flink 计算节点不断做FGC,虽然FGC已经达到每1秒一次(FGC时JVM会暂停,导致程序卡顿延时),但是老年代并没有任何回收(一直是99.98),因此可以判断出现了内存泄漏,究竟是哪个位置出现了内存泄漏呢?
jmap分析: 通过jmap查看堆使用排行,惊讶的发现排在第一是Atomiclong类,占堆内存达到恐怖的2.7G,而我们的代码并没有显示使用Atomiclong类,排第二的是[C,表示字符串,字符串在程序中用的很多,排第二属正常,第三还是Atomiclong类,这个Atomiclong类究竟是哪个对象引用的呢?第四是genericonobjectpool,这个也不正常,程序中连接池对象竟然有372198个,哪里用得了这么多,还有一个jedisFactory类,一个工厂类竟然也有37万个实例,也是有问题的。
mat分析:
通过简单的jmap命令,发现很多不正常的类占据着堆内存的前几名,这些类究竟有什么关系,谁才是罪魁祸首?只好使出我们的终极MAT分析大法。
通过分析导出生成的dump, 整个dump文件有6.7G,使用32G内存的机器经过10多分钟的处理,才得到以下分析结果。
分析报告显示ScheduledThreadPoolExecutor对象持有4.3GB堆内存未释放,堆饼图中占有97%
点进去查看树图,发现ScheduledThreadPoolExecutor对象持有4.3GB堆内存全部是GenericObjectPool对象(4.3G,接近1百万个对象)
再点击GenericObjectPool展开后发现:
之前通过jmap分析排行在前的AtomicLong(排第一,占2.7G)和redisFactory类都是躲藏在GenericObjectPool对象中的。分析至此,本人的第六撸感告诉我,离事情的真相越来越近了。与redis连接相关的GenericObjectPool对象就是问题的真凶,内存泄漏的对象。
看到GenericObjectPool连接池对象不释放,首先想到的是连redis的连接池去掉。将flink任务与redis交互的代码去掉GenericObjectPool连接池,采用直接获取redisCluseter对象方式:
(上图是初始代码,JedisCluter保存在GenericObjectPool连接池中)
(去掉GenericObjectPool连接池后只获取JedisCluster对象)
结果:问题未缓解,未解决,还得继续。
由于去掉和redis的连接池未解决问题,依然生成大量GenericObjectPool对象不释放,一个推测是并发原因导致单例没有生效,生成了很多个JedisCluster对象,而JedisCluster对象包含了连接池。尝试synchronized加锁:
结果:问题仍未缓解,仍未解决,还得继续。
上两步都没有进展,从头开始分析代码,代码分析过程中发现flink十多个任务都是使用统一的redis初始化方法且只生成单个redis连接实例。十多个flink任务, 每个flink任务中又有许多地方需要用到redis连接,redis单例在这里就会成为瓶颈(数据处理不过来,进而积压)。于是变单例的redisCluseter对象为多带带变量,每个用到redis连接的类都生成redisCluseter变量,然后再与redis交互,以此使redis随Flink的连接数并发派生。
整改结果:问题得到阶段性解决,之前运行一天就出现堆和gc问题,整改后稳定运行三天后又出现同样问题。
虽然只稳定运行三天,但对笔者和整个团队来说,也还是很开心的,说明我们的方向大概率是对的。但问题复现,作为四有好青年的IT民工,咱得发扬不怕苦,不怕累的精神继续分析排查。这次排查过程中发现众多的GenericObjectPool是由ScheduledThreadPoolExector引用的,ScheduledThreadPoolExector是一个异步类,再排查flink中的异步方法。找到AsyncDataStream.unorderedWait()是异步写入redis方法,将其修改为改造后的官方flink-redis连接包,去除异步。
结果:问题解决,堆和gc一直正常
业务链指标生成正常:
指标数据量正常:
未发现有线程锁现象:
Gc 正常:
1.通过此次问题一波三折的解决过程,笔者总结在排查分析处理相关开源组件的性能问题时,要充分利用jdk自带的stat/jmap/jstack等内存分析工具及相关开源性能监测工具(如arthas)对进程运行状态进行深入分析,找出性能瓶颈,如死锁,fgc频繁等。
2.通过hadoop web管理界面,自带背压监测图及metrics监测指标图可以查看任务运行现状。条件充许情况下,建议利用Prometheus工具对metrics进行实时监测。
3.结合日志,分阶段分析任务逻辑存在的性能瓶颈,然后通过一系列的优化措施(拆分/合并/过滤/异步)提高任务处理性能。
开源组件架构的最优化使用是基于海量业务场景不断迭代进化而来,这个过程对自己对团队都是一种历练和精进。在问题得到最终解决,性能得到大幅提升,业务流畅运行后,有种发自内心的会当凌绝顶,一览众山小的成就感。最后感谢那些通宵排查问题的夜晚和我一起并肩作战的兄弟们。
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/130230.html
摘要:基于在阿里巴巴搭建的平台于年正式上线,并从阿里巴巴的搜索和推荐这两大场景开始实现。在经过一番调研之后,阿里巴巴实时计算认为是一个非常适合的选择。接下来,我们聊聊阿里巴巴在层对又大刀阔斧地进行了哪些改进。 Apache Flink 概述 Apache Flink(以下简称Flink)是诞生于欧洲的一个大数据研究项目,原名StratoSphere。该项目是柏林工业大学的一个研究性项目,早期...
摘要:另外,将机制发扬光大,对有着非常好的支持。系统也注意到并讨论了和的问题。总结本文分享了四本相关的书籍和一份领域相关的论文列表篇,涉及的设计,实现,故障恢复,弹性扩展等各方面。 前言 之前也分享了不少自己的文章,但是对于 Flink 来说,还是有不少新入门的朋友,这里给大家分享点 Flink 相关的资料(国外数据 pdf 和流处理相关的 Paper),期望可以帮你更好的理解 Flink。...
摘要:第三个就是比较重点的内容,在有赞的实践。第四部分是将实时计算化,界面化的一些实践。二有赞实时平台架构有赞的实时平台架构呢有几个主要的组成部分。实时平台提供了集群管理,项目管理,任务管理和报警监控的功能。。 一、前言 这篇主要由五个部分来组成: 首先是有赞的实时平台架构。 其次是在调研阶段我们为什么选择了 Flink。在这个部分,主要是 Flink 与 Spark 的 structure...
摘要:第三个就是比较重点的内容,在有赞的实践。第四部分是将实时计算化,界面化的一些实践。二有赞实时平台架构有赞的实时平台架构呢有几个主要的组成部分。实时平台提供了集群管理,项目管理,任务管理和报警监控的功能。。 一、前言 这篇主要由五个部分来组成: 首先是有赞的实时平台架构。 其次是在调研阶段我们为什么选择了 Flink。在这个部分,主要是 Flink 与 Spark 的 structure...
阅读 1235·2023-01-11 13:20
阅读 1543·2023-01-11 13:20
阅读 996·2023-01-11 13:20
阅读 1651·2023-01-11 13:20
阅读 3958·2023-01-11 13:20
阅读 2456·2023-01-11 13:20
阅读 1290·2023-01-11 13:20
阅读 3452·2023-01-11 13:20