资讯专栏INFORMATION COLUMN

k8s与监控--改造telegraf的buffer实现

everfly / 3002人阅读

摘要:改造的实现前言最近在使用的场景中,要求数据在程序意外终止的时候不丢失。按照最初的原始实现在内部维护了两个,分别是和。基于实现的持久化在持久化机制的选型中,优先实现。结语改造以后,可以根据自己的需求通过配置文件来决定使用或是来实现。

改造telegraf的buffer实现 前言

最近在使用telegraf的场景中,要求数据在程序意外终止的时候不丢失。按照telegraf最初的原始实现,在running_output内部维护了两个buffer,分别是metrics和failMetrics。这两个buffer是基于go中channel实现的。由于没有持久化机制,在意外退出的时候,存在丢失数据的风险。所以这篇文章主要讲述之前telegraf保证数据安全的一些措施和我们对代码的一些优化。

telegraf关于数据安全的处理办法

关于两个buffer,定义在running_output.go的struct中。

// RunningOutput contains the output configuration
type RunningOutput struct {
    Name              string
    Output            telegraf.Output
    Config            *OutputConfig
    MetricBufferLimit int
    MetricBatchSize   int

    MetricsFiltered selfstat.Stat
    MetricsWritten  selfstat.Stat
    BufferSize      selfstat.Stat
    BufferLimit     selfstat.Stat
    WriteTime       selfstat.Stat

    metrics     *buffer.Buffer
    failMetrics *buffer.Buffer

    // Guards against concurrent calls to the Output as described in #3009
    sync.Mutex
}

这个两个buffer的大小提供了配置参数可以设置。

metrics:           buffer.NewBuffer(batchSize),
failMetrics:       buffer.NewBuffer(bufferLimit),

顾名思义。metrics存放要发送到指定output的metric,而failMetrics存放发送失败的metric。当然失败的metrics会在telegraf重发机制下再次发送。

    if ro.metrics.Len() == ro.MetricBatchSize {
        batch := ro.metrics.Batch(ro.MetricBatchSize)
        err := ro.write(batch)
        if err != nil {
            ro.failMetrics.Add(batch...)
        }
    }

在向metrics增加metrics的时候,做是否达到批量发送的数量,如果达到就调用发送方法。当然还有定时的解决方案,如果一直没有达到MetricBatchSize,也会在一定时间后发送数据。具体实现代码在agent.go中

    ticker := time.NewTicker(a.Config.Agent.FlushInterval.Duration)
    semaphore := make(chan struct{}, 1)
    for {
        select {
        case <-shutdown:
            log.Println("I! Hang on, flushing any cached metrics before shutdown")
            // wait for outMetricC to get flushed before flushing outputs
            wg.Wait()
            a.flush()
            return nil
        case <-ticker.C:
            go func() {
                select {
                case semaphore <- struct{}{}:
                    internal.RandomSleep(a.Config.Agent.FlushJitter.Duration, shutdown)
                    a.flush()
                    <-semaphore
                default:
                    // skipping this flush because one is already happening
                    log.Println("W! Skipping a scheduled flush because there is" +
                        " already a flush ongoing.")
                }
            }()

在程序接受到停止信号后,程序会首先flush剩下的数据到output中,然后退出进程。这样可以保证一定的数据安全。

基于redis实现buffer的持久化

在持久化机制的选型中,优先实现redis。本身redis性能高,而且具备完善的持久化。
具体的实现架构如下:

将原buffer中功能抽象出buffer.go接口。
具体代码:

package buffer

import (
    "github.com/influxdata/telegraf"
    "github.com/influxdata/telegraf/internal/buffer/memory"
    "github.com/influxdata/telegraf/internal/buffer/redis"
)

const (
    BufferTypeForMemory = "memory"
    BufferTypeForRedis  = "redis"
)

type Buffer interface {
    IsEmpty() bool
    Len() int
    Add(metrics ...telegraf.Metric)
    Batch(batchSize int) []telegraf.Metric
}

func NewBuffer(mod string, size int, key, addr string) Buffer {
    switch mod {
    case BufferTypeForRedis:
        return redis.NewBuffer(size, key, addr)
    default:
        return memory.NewBuffer(size)
    }
}

然后分别内存和redis实现了Buffer接口。
其中NewBuffer相当于一个工厂方法。
当然在后期可以实现基于file和db等buffer实现,来满足不同的场景和要求。

redis实现buffer的要点

由于要满足先进先出的要求,选择了redis的list数据结构。redis中的list是一个字符串list,所以telegraf中metric数据接口要符合序列化的要求。比如属性需要可导出,即public。所以这点需要改动telegraf对于metric struct的定义。另外可以选择json或是msgpack等序列化方式。我们这边是采用的json序列化的方式。

结语

改造以后,可以根据自己的需求通过配置文件来决定使用channel或是redis来实现buffer。各有优劣,内存实现的话,性能高,受到的依赖少。而redis这种分布式存储,决定了数据安全,但是性能会有一定的损耗,毕竟有大量的序列化和反序列化以及网络传输,当然依赖也增加了,取决于redis的可靠性,建议redis集群部署。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/32658.html

相关文章

  • 利用TICK搭建Docker容器可视化监控中心

    摘要:在我的前文容器可视化监控中心搭建之中我们就实践过容器的可视化监控,在那篇文章中我们是使用了技术栈来完成的。 showImg(https://segmentfault.com/img/remote/1460000015484084); 概述 性能监控是容器服务必不可少的基础设施,容器化应用运行于宿主机上,我们需要知道该容器的运行情况,包括 CPU使用率、内存占用、网络状况以及磁盘空间等...

    LiuZh 评论0 收藏0
  • FastD 最佳实践四: 构建系统可视化监控

    摘要:的展示非常炫酷,绝对是运维提升逼格的一大利器。另外的可视化功能比强得多,而且以上版本将集成报警功能。它由写成,着力于高性能地查询与存储时序型数据。被广泛应用于存储系统的监控数据,行业的实时数据等场景。 原有监控系统 showImg(https://segmentfault.com/img/remote/1460000011082384); 整个系统以 Graphite (carbon ...

    khlbat 评论0 收藏0
  • 拉勾网基于 UK8S平台容器化改造实践

    摘要:宋体本文从拉勾网的业务架构日志采集监控服务暴露调用等方面介绍了其基于的容器化改造实践。宋体此外,拉勾网还有一套自研的环境的业务发布系统,不过这套发布系统未适配容器环境。写在前面 拉勾网于 2019 年 3 月份开始尝试将生产环境的业务从 UHost 迁移到 UK8S,截至 2019 年 9 月份,QA 环境的大部分业务模块已经完成容器化改造,生产环境中,后台管理服务已全部迁移到 UK8...

    CoorChice 评论0 收藏0
  • 这么多监控组件,总有一款适合你

    摘要:典型实现不同的监控模块,侧重于不同领域,有着不同的职责。指标收集方面,支持多样化的组件将被优先下使用。以上谈了这么多,仅仅是聊了一下收集方面而已。 更多文章,请移步微信公众号《小姐姐味道》 mp原文 https://mp.weixin.qq.com/s?__...监控是分布式系统的必备组件,能够起到提前预警、问题排查、评估决策等功效,乃行走江湖、居家必备之良品。 监控系统概要 功能划分...

    simon_chen 评论0 收藏0
  • 这么多监控组件,总有一款适合你

    摘要:典型实现不同的监控模块,侧重于不同领域,有着不同的职责。指标收集方面,支持多样化的组件将被优先下使用。以上谈了这么多,仅仅是聊了一下收集方面而已。 更多文章,请移步微信公众号《小姐姐味道》 mp原文 https://mp.weixin.qq.com/s?__...监控是分布式系统的必备组件,能够起到提前预警、问题排查、评估决策等功效,乃行走江湖、居家必备之良品。 监控系统概要 功能划分...

    wpw 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<