摘要:前言在资源审计和计费这块,容器和虚机有很大区别。支持诸多输出,称为。所以本文主要讲如何为增加。实际上,基于增加并且更改,也可以做到,只不过需要装一些包指令,结果就是镜像变大。实际运行日志截图由于的出色的写入性能,运行非常稳定。
前言
在k8s资源审计和计费这块,容器和虚机有很大区别。相对虚机来讲,容器不容易实现。
资源指标收集可以采用heapster,也可以用prometheus。之前文章有介绍过,prometheus的存储的瓶颈和查询较大数据量,容易oom这两个问题。所以选择了heapster。此外,heapster不仅内部实现了很多aggregator和calculator,做了很多聚合层的工作。而采用prometheus,你需要在查询的时候做聚合。
heapster支持诸多metrics输出,称为sink。目前支持的sink如下图:
而我比较倾向于clickhouse数据库,关于clickhouse,其实前面的文章介绍过很多了。
所以本文主要讲如何为heapster增加clickhouse sink。
看代码,增加一种sink还是很简单的。典型的工厂设计模式,实现 Name,Stop,ExportData 接口方法即可。最后再提供一个初始化函数,供factory调用即可。
初始化方法 NewClickhouseSink具体代码:
config, err := clickhouse_common.BuildConfig(uri) if err != nil { return nil, err } client, err := sql.Open("clickhouse", config.DSN) if err != nil { glog.Errorf("connecting to clickhouse: %v", err) return nil, err } sink := &clickhouseSink{ c: *config, client: client, conChan: make(chan struct{}, config.Concurrency), } glog.Infof("created clickhouse sink with options: host:%s user:%s db:%s", config.Host, config.UserName, config.Database) return sink, nil
基本上就是获取配置文件,初始化clickhouse 的client。
在factory.go 中 build方法中,加入刚刚实现的初始化函数
func (this *SinkFactory) Build(uri flags.Uri) (core.DataSink, error) { switch uri.Key { case "elasticsearch": return elasticsearch.NewElasticSearchSink(&uri.Val) case "gcm": return gcm.CreateGCMSink(&uri.Val) case "stackdriver": return stackdriver.CreateStackdriverSink(&uri.Val) case "statsd": return statsd.NewStatsdSink(&uri.Val) case "graphite": return graphite.NewGraphiteSink(&uri.Val) case "hawkular": return hawkular.NewHawkularSink(&uri.Val) case "influxdb": return influxdb.CreateInfluxdbSink(&uri.Val) case "kafka": return kafka.NewKafkaSink(&uri.Val) case "librato": return librato.CreateLibratoSink(&uri.Val) case "log": return logsink.NewLogSink(), nil case "metric": return metricsink.NewMetricSink(140*time.Second, 15*time.Minute, []string{ core.MetricCpuUsageRate.MetricDescriptor.Name, core.MetricMemoryUsage.MetricDescriptor.Name}), nil case "opentsdb": return opentsdb.CreateOpenTSDBSink(&uri.Val) case "wavefront": return wavefront.NewWavefrontSink(&uri.Val) case "riemann": return riemann.CreateRiemannSink(&uri.Val) case "honeycomb": return honeycomb.NewHoneycombSink(&uri.Val) case "clickhouse": return clickhouse.NewClickhouseSink(&uri.Val) default: return nil, fmt.Errorf("Sink not recognized: %s", uri.Key) } }Name 和 Stop
func (sink *clickhouseSink) Name() string { return "clickhouse" } func (tsdbSink *clickhouseSink) Stop() { // Do nothing }
stop 函数在heapster关闭的时候调用,执行一些非托管资源的关闭。
ExportData这是核心的地方。
func (sink *clickhouseSink) ExportData(dataBatch *core.DataBatch) { sink.Lock() defer sink.Unlock() if err := sink.client.Ping(); err != nil { glog.Warningf("Failed to ping clickhouse: %v", err) return } dataPoints := make([]point, 0, 0) for _, metricSet := range dataBatch.MetricSets { for metricName, metricValue := range metricSet.MetricValues { var value float64 if core.ValueInt64 == metricValue.ValueType { value = float64(metricValue.IntValue) } else if core.ValueFloat == metricValue.ValueType { value = float64(metricValue.FloatValue) } else { continue } pt := point{ name: metricName, cluster: sink.c.ClusterName, val: value, ts: dataBatch.Timestamp, } for key, value := range metricSet.Labels { if _, exists := clickhouseBlacklistLabels[key]; !exists { if value != "" { if key == "labels" { lbs := strings.Split(value, ",") for _, lb := range lbs { ts := strings.Split(lb, ":") if len(ts) == 2 && ts[0] != "" && ts[1] != "" { pt.tags = append(pt.tags, fmt.Sprintf("%s=%s", ts[0], ts[1])) } } } else { pt.tags = append(pt.tags, fmt.Sprintf("%s=%s", key, value)) } } } } dataPoints = append(dataPoints, pt) if len(dataPoints) >= sink.c.BatchSize { sink.concurrentSendData(dataPoints) dataPoints = make([]point, 0, 0) } } } if len(dataPoints) >= 0 { sink.concurrentSendData(dataPoints) } sink.wg.Wait() }
主要有以下几个地方需要注意
数据的格式转换。需要将heapster 中DataBatch 转化为你目的存储的格式。其实这块做过pipeline 多output的人,很容易理解。
批量写入。一般在大数据量的时候,批量写入是一种有效的手段。
根据设置参数,并发写入目的存储。用到了golang的协程。下面这段代码实现了一个协程的发送数据。
func (sink *clickhouseSink) concurrentSendData(dataPoints []point) { sink.wg.Add(1) // use the channel to block until there"s less than the maximum number of concurrent requests running sink.conChan <- struct{}{} go func(dataPoints []point) { sink.sendData(dataPoints) }(dataPoints) }获取配置参数
这块在clickhouse.go中,主要做了获取配置参数和参数初始化一些默认值,以及对配置参数校验的工作。
dockerfile的更改原来的基础镜像是基于scratch
FROM scratch COPY heapster eventer / COPY ca-certificates.crt /etc/ssl/certs/ # nobody:nobody USER 65534:65534 ENTRYPOINT ["/heapster"]
由于需要改timezone的问题,改成了基于alpine。
FROM alpine RUN apk add -U tzdata RUN ln -sf /usr/share/zoneinfo/Asia/Shanghai /etc/localtime COPY heapster eventer / COPY ca-certificates.crt /etc/ssl/certs/ RUN chmod +x /heapster ENTRYPOINT ["/heapster"]
实际上,基于scratch增加timezone并且更改,也可以做到,只不过需要装一些包指令,结果就是镜像变大。与其如此,不如基于我比较熟悉的alpine实现。
总结fork的项目地址。实际运行日志截图:
由于ck的出色的写入性能,运行非常稳定。
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/32707.html
摘要:前言在资源审计和计费这块,容器和虚机有很大区别。支持诸多输出,称为。所以本文主要讲如何为增加。实际上,基于增加并且更改,也可以做到,只不过需要装一些包指令,结果就是镜像变大。实际运行日志截图由于的出色的写入性能,运行非常稳定。 前言 在k8s资源审计和计费这块,容器和虚机有很大区别。相对虚机来讲,容器不容易实现。资源指标收集可以采用heapster,也可以用prometheus。之前文...
摘要:源码版本简介是下的一个监控项目,用于进行容器集群的监控和性能分析。基本的功能及概念介绍可以回顾我之前的一篇文章监控之介绍。在源码分析之前我们先介绍的实现流程,由上图可以看出会从各个上获取相关的监控信息,然后进行汇总发送给后台数据库。 源码版本 heapster version: release-1.2 简介 Heapster是Kubernetes下的一个监控项目,用于进行容器集群的监控...
摘要:在每个上都会运行,它会收集本机以及容器的监控数据。使用这里主要介绍的使用,及可获取的。参考资料文档文档及可用在官方文档中都介绍的比较齐全。我们没有采用该方式,是考虑到如果和监控系统相互依赖,会导致异常之后,存在监控系统无法使用的隐患。 什么是Heapster? Heapster是容器集群监控和性能分析工具,天然的支持Kubernetes和CoreOS。Kubernetes有个出名的监控...
摘要:还可以把数据导入到第三方工具展示或使用场景共同组成了一个流行的监控解决方案原生的监控图表信息来自在中也用到了,将作为,向其获取,作为水平扩缩容的监控依据监控指标流程首先从获取集群中所有的信息。 概述 该项目将被废弃(RETIRED) Heapster是Kubernetes旗下的一个项目,Heapster是一个收集者,并不是采集 1.Heapster可以收集Node节点上的cAdvis...
阅读 3048·2021-11-22 15:29
阅读 1728·2021-10-12 10:11
阅读 1750·2021-09-04 16:45
阅读 2227·2021-08-25 09:39
阅读 2789·2021-08-18 10:20
阅读 2509·2021-08-11 11:17
阅读 446·2019-08-30 12:49
阅读 3305·2019-08-30 12:49