摘要:免费领取验证码内容安全短信发送直播点播体验包及云服务器等套餐更多网易技术产品运营经验分享请访问网易云社区。文章来源网易云社区
本文由作者林洋港授权网易云社区发布。
作为服务端程序,我们总是需要向外界报告一些统计数据,以助于了解系统的运行情况,比如某个接口的调用时间、系统处理的请求数等等。当我们的程序以Storm Topology的形式运行时同样需要输出这些统计数据。Storm为我们提供了Metric接口,可以方便的把一些统计指标输出到指定的地方。Storm Metric的统计方式为每隔指定的时间间隔输出统计内容。本文首先介绍Storm Metric相关的接口以及它们之间的关系,然后以实际应用中的一个例子来说明如何使用Metric接口。本文使用的Storm版本为0.9.1-incubating。
IMetric是Storm用于保存统计数据的接口
public interface IMetric {
public Object getValueAndReset();
}
接口只有一个getValueAndReset方法,当需要输出统计内容时,Storm就会调用这个方法。值得注意的是getValueAndReset方法返回的是Object类型,这为统计内容的形式提供了灵活性,我们可以返回任意的类型作为统计信息,这一点在后面的例子中我们会再提到。另一个引起我们注意的地方是IMetric接口并没有声明更新统计数据的方法,这样当我们实现IMetric接口的时候就更加灵活了——参数类型、参数个数都没有限制。Storm自身提供了6个IMetric实现:AssignableMetric、CombinedMetric、CountMetric、MultiCountMetric、ReducedMetric、StateMetric。这里只介绍CountMetric和MultiCountMetric的使用方式,以印证前面说的IMetric接口统计数据更新方式的灵活性以及getValueAndReset返回Object类型的灵活性。CountMetric就是一个简单的计数器,有两个方法incr()和incrBy(long incrementBy),其getValueAndReset方法返回一个long类型的值:
public Object getValueAndReset() { long ret = _value; _value = 0; return ret; }
MultiCountMetric,顾名思义,就是多个指标的计数器,维护着一个Map,只有一个方法CountMetric scope(String key)。因此MultiCountMetric的更新方式为MultiCountMetric.scope(key).incr()或MultiCountMetric.scope(key).incrBy(long incrementBy)。它的getValueAndReset返回的是一个Map:
public Object getValueAndReset() { Map ret = new HashMap(); for(Map.Entry e : _value.entrySet()) { ret.put(e.getKey(), e.getValue().getValueAndReset()); } return ret; }
除了IMetric接口,还有另外一个接口IMetricsConsumer,它负责向外输出统计信息,即把IMetric getValueAndReset方法返回的数据输出到外面。IMetricsConsumer有三个方法
void prepare(Map stormConf, Object registrationArgument, TopologyContext context, IErrorReporter errorReporter); void handleDataPoints(TaskInfo taskInfo, Collection dataPoints); void cleanup();
其中prepare是初始化,cleanup是生命周期结束时的清理工作,handleDataPoints才是真正的统计信息输出方法,taskInfo参数存储当前task的信息(host、port、component id、task id等等),dataPoints存储的是IMetric返回的统计信息,可能是出于性能考虑,dataPoints是一个集合,包含了多个IMetric返回的数据。让我们来具体看看DataPoint这个类:
public static class DataPoint { @Override public String toString() { return "[" + name + " = " + value + "]"; } public String name; public Object value; }
name是IMetric注册时的名字,value就是IMetric getValueAndReset返回的那个Object。
Storm只提供了一个IMetricsConsumer实现——LoggingMetricsConsumer。LoggingMetricsConsumer做的事情很简单,就是把dataPoints输出到日志文件metrics.log,下面是其handleDataPoints方法的部分代码:
for (DataPoint p : dataPoints) { sb.delete(header.length(), sb.length()); sb.append(p.name) .append(padding).delete(header.length()+23,sb.length()).append(" ") .append(p.value); LOG.info(sb.toString()); }
可以看到它通过调用DataPoint的value的toString方法把统计信息输出到日志里面的,所以如果你的IMetric实现返回的是自己定义的类型,记得重载toString()方法,让统计信息以可读的格式输出。
到这里Storm的Metric接口和自带的实现基本介绍完了,接下来我们来看看怎么使用Storm自带的这些实现。首先,Storm默认的配置是关掉Metric功能的,可以有两种方式开启Metric功能: 1)在storm.yaml里面配置,这种是集群级别的设置,个人不建议这么做,所以就不多介绍了 2)conf.registerMetricsConsumer(Class klass, long parallelismHint);这是topology级别的,klass是IMetricsConsumer的实现类,parallelismHint这个参数Storm代码里面没注释我也没深入看底层的实现,这里结合自己的实验谈谈它的意义:topology是在1个或多个worker上面以多个task的方式跑的嘛,parallelismHint就是指定多少个并发来输出统计信息。这里我也不知道parallelismHint指的是多个task、worker还是supervisor,反正parallelismHint=1的时候只在特定的一个supervisor下面的metrics.log有统计信息,parallelismHint>1时可能取决于worker的数量,我测试的时候由于是在多个supervisor上跑的,因此观察到多个supervisor都有metrics.log的输出。个人经验是parallelismHint设为1,这样可以在一个supervisor下面的metrics.log就能看到所有task的统计信息。
由于我建议采用第二种方法,所以示例代码为:
//客户端注册IMetricsConsumer
conf.registerMetricsConsumer(LoggingMetricsConsumer.class);
StormSubmitter.submitTopology(name, conf, builder.createTopology());
//我们假设要统计spout某段代码的调用次数
//注册IMetric
@Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { ... metric=new CountMetric(); context.registerMetric("spout time cost", metric, 60); //因此DataPoint的name为spout time cost,60表示1分钟统计一次 ... } //更新统计数据
@Override
public void nextTuple() { if(...)... else{ ... metric.incr(); } } 这样就可以了,然后你就能在metrics.log看到统计数据了。
现在,假设我们的需求跟上面不太一样:1)metrics.log只打印我们自己维护的统计信息,屏蔽__system、__fail-count这种系统自己的统计信息;2)不只统计代码的调用次数,还要统计调用时间——最小时间、最大时间、平均时间。
第一点可以通过重载LoggingMetricsConsumer的方法来实现:
public class AppLoggingMetricsConsumer extends LoggingMetricsConsumer {
@Override public void handleDataPoints(TaskInfo taskInfo, CollectiondataPoints) { if (taskInfo.srcComponentId != null && taskInfo.srcComponentId.startsWith("__")) return; if (dataPoints == null || dataPoints.isEmpty()) return; List list = new ArrayList (); for (DataPoint p : dataPoints) { if (p.name == null || p.name.startsWith("__")) continue; list.add(p); } if (list.isEmpty()) return; super.handleDataPoints(taskInfo, list); }
}
第二点需要开发我们自己的IMetric接口实现类TimeCostMetric,以下是其主要代码:
@Override public Object getValueAndReset() { TimeCost timeCost=new TimeCost(); timeCost.count=count; if(timeCost.count>0){ timeCost.min=min; timeCost.max=max; timeCost.mean=all*1.0/timeCost.count; } init(); return timeCost; }
public void update(long time){
count++; all+=time; if(min>time)min=time; if(max
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/25377.html
摘要:项目地址前言大数据技术栈思维导图大数据常用软件安装指南一分布式文件存储系统分布式计算框架集群资源管理器单机伪集群环境搭建集群环境搭建常用命令的使用基于搭建高可用集群二简介及核心概念环境下的安装部署和命令行的基本使用常用操作分区表和分桶表视图 项目GitHub地址:https://github.com/heibaiying... 前 言 大数据技术栈思维导图 大数据常用软件安装指...
摘要:关于三者的一些概括总结离线分析框架,适合离线的复杂的大数据处理内存计算框架,适合在线离线快速的大数据处理流式计算框架,适合在线的实时的大数据处理我是一个以架构师为年之内目标的小小白。 整理自《架构解密从分布式到微服务》第七章——聊聊分布式计算.做了相应补充和修改。 [TOC] 前言 不管是网络、内存、还是存储的分布式,它们最终目的都是为了实现计算的分布式:数据在各个计算机节点上流动,同...
摘要:源码版本简介是下的一个监控项目,用于进行容器集群的监控和性能分析。基本的功能及概念介绍可以回顾我之前的一篇文章监控之介绍。在源码分析之前我们先介绍的实现流程,由上图可以看出会从各个上获取相关的监控信息,然后进行汇总发送给后台数据库。 源码版本 heapster version: release-1.2 简介 Heapster是Kubernetes下的一个监控项目,用于进行容器集群的监控...
摘要:本文所阐述的时间序列数据库,系笔者所负责产品对性能指标进行聚合分组过滤过程中的梳理和总结。而带有标志的,则是数据采集源,将数据发给服务。左面的则是的特点之一,其规则为以上属性值均为对应名称的。 【编者按】 刘斌,OneAPM后端研发工程师,拥有10多年编程经验,参与过大型金融、通信以及Android手机操作系的开发,熟悉Linux及后台开发技术。曾参与翻译过《第一本Docker书》、《...
阅读 2857·2023-04-25 17:59
阅读 686·2023-04-25 15:05
阅读 676·2021-11-25 09:43
阅读 3040·2021-10-12 10:13
阅读 3546·2021-09-27 13:59
阅读 3590·2021-09-23 11:21
阅读 3892·2021-09-08 09:35
阅读 571·2019-08-29 17:12