基于这个需求,flink本身提供了很多的任务运行时刻Metrics相关指标,避免任务的运行处于黑盒状态,通过分析指标,可以快速的调整任务的资源、定位遇到的问题。目前获取 任务Metrics 有三种方式:
方式一:
通过flink WebUI 进入Metrics 选项卡,根据不同算子,选择需要监测指标,实时查看指标数据,缺点比较明显,无法查看历史监测数据,需要一直打开,并且无法设置告警,适合开发过程时使用。
方式二:
官方提供了一种通过 REST API获取方式指标的方式,
https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/ops/rest_api/,提供的api 主要是面向 JobManager 对象的相关动作api, 主要用于任务提交等操作,但所提供TaskManager对象指标信息很少。通过此方式获取相应的指标,需要另外开发一套自动化脚本或者程序,定期调用api 获取相关信息,前题条件是需要提前规则监测的指标及对应的请求的api 地址,增大了系统复杂程度。
方式三:
flink 提供了一种MetricsReporter机制,可以将各个组件的metrics数据,通过不同的Metric Reporter插件将数据自动暴露给外部系统,这样可以充份利用使用第三方的存储和分析能力。
目前flink已经支持了很多reporter,如Graphite、JMX、InfluxDB、Prometheus等,不管用哪一处方式,都需要额外部署第三方系统来,进行接收、解析、分析metric数据。
我们本身已有了自动化运维平台,不会考虑部署像Prometheus这样的第三方平台,需要做的是如何将metric数据跟自动运维平台告警模块进行对接使用,告警模块主要是通过kafka进行对接数据,所以采用自定义kafka reporter 解决数据对接问题。
flink metrices指标项比较多,指标数据量级跟所跑的任务个数有着直接的关系,我们关注的核心指标项,对核心指标进行规则告警,接下来介绍如何基于flink 现有的reporter 代码实现kafka reporter功能点:
1. 下载对应版本flink 分支代码,如
https://github.com/apache/flink/releases/tag/release-1.13.6
2. 解压源代码,导入开发工具,查看flink-metrics模块代码
3. 根据自己实际场景,对吐数据格式要求,参考不同自带模板代码,以flink-metrics-InfluxDB模块代码为例,新建flink-metrics-kafka
3.1 修改flink-metrics/pom.xml 文件,新增
3.2 新增KafkaReporterFactory主程序,需要实现MetricReporterFactory接口并重写方法。
@InterceptInstantiationViaReflection(
reporterClassName = "org.apache.flink.metrics.kafka.KafkaReporter")
public class KafkaReporterFactory implements MetricReporterFactory {
@Override
public MetricReporter createMetricReporter(Properties properties) {
return new KafkaReporter();
}
}
3.3 新增KafkaReporter实现类,需要继承AbstractReporter并实现Scheduled接口并重写方法,主要作用是收集指标数据,并推送到kafka。
读取配置flink配置文件conf/flink-conf.yaml kakfa 服务器地址及topic地址,初始化KafkaProducer消息生产者对象。
@Override
public void open(MetricConfig metricConfig) {
LOG.info("metricConfig:" + metricConfig);
topic = metricConfig.getString("topic", "");
if (StringUtils.isBlank(topic)) {
LOG.error("metrics.reporter.kafka_reporter.topic is null");
}
String endsWithMetric = metricConfig.getString("endswith.metricname", "").trim(); //指定需要获取指标名称
endsWithMetricList = Arrays.asList(endsWithMetric.split(","));
String bootstrapservers = metricConfig.getString("bootstrap.servers", "");
if (StringUtils.isBlank(bootstrapservers)) {
LOG.error("metrics.reporter.kafka_reporter.bootstrap.servers is null");
}
Properties properties = new Properties();
properties.put("bootstrap.servers", bootstrapservers);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put(
"value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProducer = new KafkaProducer<String, String>(properties);
}
指标数据的类型及格式是根据它所归属metrics类型(Counters/Gauges/Histograms/Meters)有关,然后我们可以对指标数据进行格式化输出所需要格式到kafka。
① Counters: 统计的是一个累加值,用与存储数值类型指标数据。
② Gauges:用来存储任何类型指标数据。
③ Histograms:度量值的统计结果,如平均值、最大值等。
④ Meters:用来计算平均速率,平均吞吐量等。
@Override
public void report() {
tryReport();
}
private final ObjectMapper mapper = new ObjectMapper();
private void tryReport() {
Instant timestamp = Instant.now();
try {
String job_id = "";
String job_name = "";
List metriclist = new ArrayList<>();
metriclist.addAll(gauges.values()); //获取gauges类型指标集
metriclist.addAll(counters.values());//获取gauges类型指标集
metriclist.addAll(histograms.values());//获取histograms类型指标集
metriclist.addAll(meters.values());//获取meters类型指标集
//每个指标数据里面都加上对应的job_id, job_name
for (MeasurementInfo info : metriclist) {
if (info.getName().startsWith("jobmanager_job_")
|| info.getName().startsWith("taskmanager_job_")) {
job_id = info.getTags().getOrDefault(JOB_ID, "");
job_name = info.getTags().getOrDefault(JOB_NAME, "");
if (StringUtils.isBlank(job_id) || StringUtils.isBlank(job_name)) {
LOG.error("do not get job_id or job name:{}", info);
}
break;
}
}
List
新增flink-metrics-kafka项目有两种打包方式:
基于完整的flink源码项目,进行全量打包。
保留flink maven父子结构,flink parent pom.xml 只留
3.4 修改flink配置文件conf/flink-conf.yaml,主要包括Reporter全类名,上报周期,指定所需的指标名。
metrics.reporters:kafka_reporter
metrics.reporter.kafka_reporter.factory.class:org.apache.flink.metrics.kafka.KafkaReporterFactory
metrics.reporter.kafka_reporter.interval:60 SECONDS
#kafka地址
metrics.reporter.kafka_reporter.bootstrap.servers:XXX.XXX.XXX.10:9090
#kafkatopic
metrics.reporter.kafka_reporter.topic:kafka_topic
#指标名称按后缀进行过滤,注释则不过滤
metrics.reporter.kafka_reporter.endswith.metricname:job_numRestarts,job_restartingTime,job_uptime,currentOutputWatermark,Status_JVM_CPU_Load,Status_JVM_Memory_Heap_Used
3.5 提交任务,消费kafka 可以获取对应的数据。
[
{
"name": "jobmanager.uptime",
"time": 1647314569119,
"fields": {
"value": 1703478823
},
"tags": {
"host": "bigdata-03",
"job_id": "dc7a58b3f202059cd72c3467ecedb4b7",
"job_name": "amp_zabbix_pre"
}
},
{
"name": "jobmanager.Status.JVM.Memory.Heap.Max",
"time": 1647314569119,
"fields": {
"value": 468713472
},
"tags": {
"host": "bigdata-03",
"job_id": "",
"job_name": ""
}
},
...
]
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/129566.html
摘要:从上面自定义的可以看到我们继承的就是这个类,那么来了解一下一个抽象类,继承自。该类的子类有三个,两个是抽象类,在此基础上提供了更具体的实现,另一个是。 showImg(https://segmentfault.com/img/remote/1460000016978900?w=1920&h=1641); 前言 在 《从0到1学习Flink》—— Data Source 介绍 文章中,我...
摘要:从到学习介绍从到学习介绍其中包括了和的,后面我也讲了下如何自定义自己的和。这个问题可是线上很容易遇到的关注我转载请务必注明原创地址为微信公众号另外我自己整理了些的学习资料,目前已经全部放到微信公众号了。 showImg(https://segmentfault.com/img/remote/1460000017935460?w=1280&h=853); 前言 前面 FLink 的文章中...
阅读 1346·2023-01-11 13:20
阅读 1684·2023-01-11 13:20
阅读 1132·2023-01-11 13:20
阅读 1858·2023-01-11 13:20
阅读 4100·2023-01-11 13:20
阅读 2704·2023-01-11 13:20
阅读 1385·2023-01-11 13:20
阅读 3597·2023-01-11 13:20