资讯专栏INFORMATION COLUMN

k8s与日志--journalbeat源码解读

Amio / 1613人阅读

摘要:但是也存在诸多的问题,随着新设备的出现以及对安全的重视,这些缺点越发显得突出,例如日志消息内容无法验证数据格式松散日志检索低效有限的元数据保存无法记录二进制数据等。该服务可以为项目增加一定数量的元数据。

前言

对于日志系统的重要性不言而喻,参照沪江的一篇关于日志系统的介绍,基本上日志数据在以下几方面具有非常重要的作用:

数据查找:通过检索日志信息,定位相应的 bug ,找出解决方案

服务诊断:通过对日志信息进行统计、分析,了解服务器的负荷和服务运行状态

数据分析:可以做进一步的数据分析,比如根据请求中的课程 id ,找出 TOP10 用户感兴趣课程

日志+大数据+AI的确有很多想象空间。
而对于收集系统,流行的技术stack有之前的elk,到现在的efk。logstash换成了filebeat。当然日志收集agent,也有flume和fluentd,尤其fluentd属于cncf组织的产品,在k8s中有着广泛的应用。但是fluentd是ruby写的,不利于深入源码了解。当然今天我们重点讲的是另外一个agent--journalbeat。望文生义,隶属于efk stack 中beats系列中的一员,专门用于收集journald日志。

journalbeat源码解读 journald日志简介

长久以来 syslog 是每一个 Unix 系统中的重要部件。在漫长的历史中在各种 Linux 发行版中都有不同的实现去完成类似的工作,它们采取的是逻辑相近,并使用基本相同的文件格式。但是 syslog 也存在诸多的问题,随着新设备的出现以及对安全的重视,这些缺点越发显得突出,例如日志消息内容无法验证、数据格式松散、日志检索低效、有限的元数据保存、无法记录二进制数据等。
Journald是针对以上需求的解决方案。受udev事件启发,Journal 条目与环境组块相似。一个键值域,按照换行符分开,使用大写的变量名。除了支持ASCII 格式的字符串外,还能够支持二进制数据,如 ATA SMART 健康信息、SCSI 数据。应用程序和服务可以通过将项目域传递给systemd journald服务来生成项目。该服务可以为项目增加一定数量的元数据。这些受信任域的值由 Journal 服务来决定且无法由客户端来伪造。在Journald中,可以把日志数据导出,在异地读取,并不受处理器架构的影响。这对嵌入式设备是很有用的功能,方便维护人员分析设备运行状况。
大致总结就是

journald日志是新的linux系统的具备的

journald区别于传统的文件存储方式,是二进制存储。需要用journalctl查看。

docker对于journald的支持

The journald logging driver sends container logs to the systemd journal. Log entries can be retrieved using the journalctl command, through use of the journal API, or using the docker logs command.
即docker除了json等日志格式,已经增加了journald驱动。

目前本司使用场景

我们的k8s集群,所有的docker输出的日志格式都采用journald,这样主机centos系统日志和docker的日志都用journalbeat来收集。

journalbeat实现关键

journalbeat整个实现过程,基本上两点:

与其他社区贡献的beats系列,比如packetbeat,mysqlbeat类似,遵循了beats的框架和约定,journalbeat实现了run和stop等方法即可,然后作为一个客户端,将收集到的数据,publish到beats中。

读取journald日志,采用了coreos开源的go-systemd库中sdjournal部分。其实sdjournal是一个利用cgo 对于journald日志c接口的封装。

源码解读

程序入口:

package main

import (
    "log"

    "github.com/elastic/beats/libbeat/beat"
    "github.com/mheese/journalbeat/beater"
)

func main() {
    err := beat.Run("journalbeat", "", beater.New)
    if err != nil {
        log.Fatal(err)
    }
}

整个journalbeat共实现了3个方法即可。run,stop,和new。
run和stop顾名思义,就是beats控制journalbeat的运行和停止。
而new:
需要按照

// Creator initializes and configures a new Beater instance used to execute
// the beat its run-loop.
type Creator func(*Beat, *common.Config) (Beater, error)

实现Creator方法,返回的Beater实例,交由beats控制。
具体实现:

// New creates beater
func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {
    config := config.DefaultConfig
    var err error
    if err = cfg.Unpack(&config); err != nil {
        return nil, fmt.Errorf("Error reading config file: %v", err)
    }

    jb := &Journalbeat{
        config:     config,
        done:       make(chan struct{}),
        cursorChan: make(chan string),
        pending:    make(chan *eventReference),
        completed:  make(chan *eventReference, config.PendingQueue.CompletedQueueSize),
    }

    if err = jb.initJournal(); err != nil {
        logp.Err("Failed to connect to the Systemd Journal: %v", err)
        return nil, err
    }

    jb.client = b.Publisher.Connect()
    return jb, nil
}

一般的beats中,都会有一些共同属性。例如下面的done和client属性。

// Journalbeat is the main Journalbeat struct
type Journalbeat struct {
    done   chan struct{}
    config config.Config
    client publisher.Client

    journal *sdjournal.Journal

    cursorChan         chan string
    pending, completed chan *eventReference
    wg                 sync.WaitGroup
}

done是一个控制整个beater启停的信号量。
而client 是与beats平台通信的client。注意在初始化的时候,

jb.client = b.Publisher.Connect()

建立链接。
然后在收集到数据,发送的时候,也是通过该client

select {
        case <-jb.done:
            return nil
        default:
            // we need to clone to avoid races since map is a pointer...
            jb.client.PublishEvent(ref.body.Clone(), publisher.Signal(&eventSignal{ref, jb.completed}), publisher.Guaranteed)
        }

注意上边的发送姿势和对于刚才提到的done信号量使用。
其他方法都是业务相关不再详细解读了。

journalbeat如何保证发送失败的日志重新发送

关于这点,个人感觉是最优雅的部分

所有发送失败的日志是会在程序结束之前以json格式保存到文件,完成持久化。
    // on exit fully consume both queues and flush to disk the pending queue
    defer func() {
        var wg sync.WaitGroup
        wg.Add(2)

        go func() {
            defer wg.Done()
            for evRef := range jb.pending {
                pending[evRef.cursor] = evRef.body
            }
        }()

        go func() {
            defer wg.Done()
            for evRef := range jb.completed {
                completed[evRef.cursor] = evRef.body
            }
        }()
        wg.Wait()

        logp.Info("Saving the pending queue, consists of %d messages", len(diff(pending, completed)))
        if err := flush(diff(pending, completed), jb.config.PendingQueue.File); err != nil {
            logp.Err("error writing pending queue %s: %s", jb.config.PendingQueue.File, err)
        }
    }()
程序启动以后首先会读取之前持久化的发送失败的日志,重新发送
// load the previously saved queue of unsent events and try to publish them if any
    if err := jb.publishPending(); err != nil {
        logp.Warn("could not read the pending queue: %s", err)
    }
client publish收集到的日志到beats,设置了publisher.Guaranteed模式,成功和失败都有反馈
jb.client.PublishEvent(ref.body.Clone(), publisher.Signal(&eventSignal{ref, jb.completed}), publisher.Guaranteed)

其中publisher.Signal(&eventSignal{ref, jb.completed})类似于一个回调,凡是成功的都会写成功的ref到jb.completed中。方便客户端控制。

维护了两个chan,一个存放客户端发送的日志,一个存放服务端接受成功的日志,精确对比,可获取发送失败的日志,进入重发动作

journalbeat struct中有下面两个属性

    pending, completed chan *eventReference

每次客户端发送一条日志,都会写到pending。

case publishedChan <- jb.client.PublishEvent(event, publisher.Signal(&eventSignal{ref, jb.completed}), publisher.Guaranteed):
            if published := <-publishedChan; published {
                jb.pending <- ref

                // save cursor
                if jb.config.WriteCursorState {
                    jb.cursorChan <- rawEvent.Cursor
                }
            }
        }

publisher.Signal(&eventSignal{ref, jb.completed}),回调会将成功的写到completed。
整个程序同时会启动一个
go jb.managePendingQueueLoop()
协程,专门用来定时重发失败日志。

// managePendingQueueLoop runs the loop which manages the set of events waiting to be acked
func (jb *Journalbeat) managePendingQueueLoop() {
    jb.wg.Add(1)
    defer jb.wg.Done()
    pending := map[string]common.MapStr{}
    completed := map[string]common.MapStr{}

    // diff returns the difference between this map and the other.
    diff := func(this, other map[string]common.MapStr) map[string]common.MapStr {
        result := map[string]common.MapStr{}
        for k, v := range this {
            if _, ok := other[k]; !ok {
                result[k] = v
            }
        }
        return result
    }

    // flush saves the map[string]common.MapStr to the JSON file on disk
    flush := func(source map[string]common.MapStr, dest string) error {
        tempFile, err := ioutil.TempFile(filepath.Dir(dest), fmt.Sprintf(".%s", filepath.Base(dest)))
        if err != nil {
            return err
        }

        if err = json.NewEncoder(tempFile).Encode(source); err != nil {
            _ = tempFile.Close()
            return err
        }

        _ = tempFile.Close()
        return os.Rename(tempFile.Name(), dest)
    }

    // on exit fully consume both queues and flush to disk the pending queue
    defer func() {
        var wg sync.WaitGroup
        wg.Add(2)

        go func() {
            defer wg.Done()
            for evRef := range jb.pending {
                pending[evRef.cursor] = evRef.body
            }
        }()

        go func() {
            defer wg.Done()
            for evRef := range jb.completed {
                completed[evRef.cursor] = evRef.body
            }
        }()
        wg.Wait()

        logp.Info("Saving the pending queue, consists of %d messages", len(diff(pending, completed)))
        if err := flush(diff(pending, completed), jb.config.PendingQueue.File); err != nil {
            logp.Err("error writing pending queue %s: %s", jb.config.PendingQueue.File, err)
        }
    }()

    // flush the pending queue to disk periodically
    tick := time.Tick(jb.config.PendingQueue.FlushPeriod)
    for {
        select {
        case <-jb.done:
            return
        case p, ok := <-jb.pending:
            if ok {
                pending[p.cursor] = p.body
            }
        case c, ok := <-jb.completed:
            if ok {
                completed[c.cursor] = c.body
            }
        case <-tick:
            result := diff(pending, completed)
            if err := flush(result, jb.config.PendingQueue.File); err != nil {
                logp.Err("error writing %s: %s", jb.config.PendingQueue.File, err)
            }
            pending = result
            completed = map[string]common.MapStr{}
        }
    }
}
总结

当然还有一些其他的细节,不再一一讲述了。比如定时写Cursor的功能和日志格式转换等。具体的大家可以看源码。主要是讲了我认为其优雅的部分和为beats编写beater的要点。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/33029.html

相关文章

  • k8s日志--journalbeat源码解读

    摘要:但是也存在诸多的问题,随着新设备的出现以及对安全的重视,这些缺点越发显得突出,例如日志消息内容无法验证数据格式松散日志检索低效有限的元数据保存无法记录二进制数据等。该服务可以为项目增加一定数量的元数据。 前言 对于日志系统的重要性不言而喻,参照沪江的一篇关于日志系统的介绍,基本上日志数据在以下几方面具有非常重要的作用: 数据查找:通过检索日志信息,定位相应的 bug ,找出解决方案 ...

    jemygraw 评论0 收藏0
  • 快收藏!52篇25万字,微服务、云原生、容器、K8S、Serverless精华文章集锦

    摘要:正在走远,新年之初,小数精选过去一年阅读量居高的技术干货,从容器到微服务云原生,汇集成篇精华集锦,充分反映了这一年的技术热点走向。此文值得收藏,方便随时搜索和查看。,小数将继续陪伴大家,为朋友们奉献更有逼格的技术内容。 2017正在走远,新年之初,小数精选过去一年阅读量居高的技术干货,从容器、K8S 到微服务、云原生、Service Mesh,汇集成52篇精华集锦,充分反映了这一年的技...

    AaronYuan 评论0 收藏0
  • k8s网络--Flannel源码分析

    摘要:今天主要针对版本进行源码分析。外部接口的定义如下创建子网管理器负责子网的创建更新添加删除监听等,主要和打交道定义续约。在到期之前,子网管理器调用该方法进行续约。 前言 之前在k8s与网络--Flannel解读一文中,我们主要讲了Flannel整体的工作原理。今天主要针对Flannel v0.10.0版本进行源码分析。首先需要理解三个比较重要的概念: 网络(Network):整个集群中...

    wpw 评论0 收藏0
  • k8s网络--Flannel源码分析

    摘要:今天主要针对版本进行源码分析。外部接口的定义如下创建子网管理器负责子网的创建更新添加删除监听等,主要和打交道定义续约。在到期之前,子网管理器调用该方法进行续约。 前言 之前在k8s与网络--Flannel解读一文中,我们主要讲了Flannel整体的工作原理。今天主要针对Flannel v0.10.0版本进行源码分析。首先需要理解三个比较重要的概念: 网络(Network):整个集群中...

    hoohack 评论0 收藏0
  • k8s网络--Flannel源码分析

    摘要:今天主要针对版本进行源码分析。外部接口的定义如下创建子网管理器负责子网的创建更新添加删除监听等,主要和打交道定义续约。在到期之前,子网管理器调用该方法进行续约。 前言 之前在k8s与网络--Flannel解读一文中,我们主要讲了Flannel整体的工作原理。今天主要针对Flannel v0.10.0版本进行源码分析。首先需要理解三个比较重要的概念: 网络(Network):整个集群中...

    Jeffrrey 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<