资讯专栏INFORMATION COLUMN

k8s与日志--采用golang实现Fluent Bit的output插件

binta / 1377人阅读

摘要:采用实现的插件前言目前社区日志采集和处理的组件不少,之前方案中的,社区中的,方案中的以及大数据用到比较多的。适合采用的方案,实现日志中心化收集的方案。主要负责采集,负责处理和传送。

采用golang实现Fluent Bit的output插件 前言

目前社区日志采集和处理的组件不少,之前elk方案中的logstash,cncf社区中的fluentd,efk方案中的filebeat,以及大数据用到比较多的flume。而Fluent Bit是一款用c语言编写的高性能的日志收集组件,整个架构源于fluentd。官方比较数据如下:

Fluentd Fluent Bit
Scope Containers / Servers Containers / Servers
Language C & Ruby C
Memory ~40MB ~450KB
Performance High Performance High Performance
Dependencies Built as a Ruby Gem, it requires a certain number of gems. Zero dependencies, unless some special plugin requires them.
Plugins More than 650 plugins available Around 35 plugins available
License Apache License v2.0 Apache License v2.0

通过数据可以看出,fluent bit 占用资源更少。适合采用fluent bit + fluentd 的方案,实现日志中心化收集的方案。fluent bit主要负责采集,fluentd负责处理和传送。

扩展output插件

fluent bit 本身是C语言编写,扩展插件有一定的难度。可能官方考虑到这一点,实现了fluent-bit-go,可以实现采用go语言来编写插件,目前只支持output的编写。
fluent-bit-go其实就是利用cgo,封装了c接口。代码比较简单,主要分析其中一个关键文件

package output

/*
#include 
#include "flb_plugin.h"
#include "flb_output.h"
*/
import "C"
import "fmt"
import "unsafe"

// Define constants matching Fluent Bit core
const FLB_ERROR               =  C.FLB_ERROR
const FLB_OK                  =  C.FLB_OK
const FLB_RETRY               =  C.FLB_RETRY

const FLB_PROXY_OUTPUT_PLUGIN =  C.FLB_PROXY_OUTPUT_PLUGIN
const FLB_PROXY_GOLANG        =  C.FLB_PROXY_GOLANG

// Local type to define a plugin definition
type FLBPlugin C.struct_flb_plugin_proxy
type FLBOutPlugin C.struct_flbgo_output_plugin

// When the FLBPluginInit is triggered by Fluent Bit, a plugin context
// is passed and the next step is to invoke this FLBPluginRegister() function
// to fill the required information: type, proxy type, flags name and
// description.
func FLBPluginRegister(ctx unsafe.Pointer, name string, desc string) int {
    p := (*FLBPlugin) (unsafe.Pointer(ctx))
    p._type = FLB_PROXY_OUTPUT_PLUGIN
    p.proxy = FLB_PROXY_GOLANG
    p.flags = 0
    p.name  = C.CString(name)
    p.description = C.CString(desc)
    return 0
}

// Release resources allocated by the plugin initialization
func FLBPluginUnregister(ctx unsafe.Pointer) {
    p := (*FLBPlugin) (unsafe.Pointer(ctx))
    fmt.Printf("[flbgo] unregistering %v
", p)
    C.free(unsafe.Pointer(p.name))
    C.free(unsafe.Pointer(p.description))
}

func FLBPluginConfigKey(ctx unsafe.Pointer, key string) string {
    _key := C.CString(key)
    return C.GoString(C.output_get_property(_key, unsafe.Pointer(ctx)))
}

主要是定义了一些编写插件需要用到的变量和方法,例如FLBPluginRegister注册组件,FLBPluginConfigKey获取配置文件设定参数等。
PS
实际上用golang调用fluent-bit-go,再加一些实际的业务逻辑实现,最终编译成一个c-share的.so动态链接库。

定制fluent-bit-kafka-ouput插件

实际上,fluent-bit v0.13版本以后就提供了kafka output的插件,但是实际项目中,并不满足我们的需求,必须定制化。
当然接下来的代码主要是作为一个demo,讲清楚如何编写一个output插件。

代码编写和分析

先上代码:

package main

import (
    "C"
    "fmt"
    "io"
    "log"
    "reflect"
    "strconv"
    "strings"
    "time"
    "unsafe"

    "github.com/Shopify/sarama"
    "github.com/fluent/fluent-bit-go/output"
    "github.com/ugorji/go/codec"
)

var (
    brokers    []string
    producer   sarama.SyncProducer
    timeout    = 0 * time.Minute
    topic      string
    module     string
    messageKey string
)

//export FLBPluginRegister
func FLBPluginRegister(ctx unsafe.Pointer) int {
    return output.FLBPluginRegister(ctx, "out_kafka", "Kafka Output Plugin.!")
}

//export FLBPluginInit
// ctx (context) pointer to fluentbit context (state/ c code)
func FLBPluginInit(ctx unsafe.Pointer) int {

    if bs := output.FLBPluginConfigKey(ctx, "brokers"); bs != "" {
        brokers = strings.Split(bs, ",")
    } else {
        log.Printf("you must set brokers")
        return output.FLB_ERROR
    }

    if tp := output.FLBPluginConfigKey(ctx, "topics"); tp != "" {
        topic = tp
    } else {
        log.Printf("you must set topics")
        return output.FLB_ERROR
    }

    if mo := output.FLBPluginConfigKey(ctx, "module"); mo != "" {
        module = mo
    } else {
        log.Printf("you must set module")
        return output.FLB_ERROR
    }

    if key := output.FLBPluginConfigKey(ctx, "message_key"); key != "" {
        messageKey = key
    } else {
        log.Printf("you must set message_key")
        return output.FLB_ERROR
    }

    config := sarama.NewConfig()
    config.Producer.Return.Successes = true

    if required_acks := output.FLBPluginConfigKey(ctx, "required_acks"); required_acks != "" {
        if acks, err := strconv.Atoi(required_acks); err == nil {
            config.Producer.RequiredAcks = sarama.RequiredAcks(acks)
        }
    }

    if compression_codec := output.FLBPluginConfigKey(ctx, "compression_codec"); compression_codec != "" {
        if codec, err := strconv.Atoi(compression_codec); err == nil {
            config.Producer.Compression = sarama.CompressionCodec(codec)
        }
    }

    if max_retry := output.FLBPluginConfigKey(ctx, "max_retry"); max_retry != "" {
        if max_retry, err := strconv.Atoi(max_retry); err == nil {
            config.Producer.Retry.Max = max_retry
        }
    }

    if timeout == 0 {
        timeout = 5 * time.Minute
    }
    // If Kafka is not running on init, wait to connect
    deadline := time.Now().Add(timeout)
    for tries := 0; time.Now().Before(deadline); tries++ {
        var err error
        if producer == nil {
            producer, err = sarama.NewSyncProducer(brokers, config)
        }
        if err == nil {
            return output.FLB_OK
        }
        log.Printf("Cannot connect to Kafka: (%s) retrying...", err)
        time.Sleep(time.Second * 30)
    }

    log.Printf("Kafka failed to respond after %s", timeout)
    return output.FLB_ERROR

}

//export FLBPluginFlush
// FLBPluginFlush is called from fluent-bit when data need to be sent. is called from fluent-bit when data need to be sent.
func FLBPluginFlush(data unsafe.Pointer, length C.int, tag *C.char) int {
    var h codec.MsgpackHandle

    var b []byte
    var m interface{}
    var err error

    b = C.GoBytes(data, length)
    dec := codec.NewDecoderBytes(b, &h)

    // Iterate the original MessagePack array
    var msgs []*sarama.ProducerMessage
    for {
        // decode the msgpack data
        err = dec.Decode(&m)
        if err != nil {
            if err == io.EOF {
                break
            }
            log.Printf("Failed to decode msgpack data: %v
", err)
            return output.FLB_ERROR
        }

        // Get a slice and their two entries: timestamp and map
        slice := reflect.ValueOf(m)
        data := slice.Index(1)

        // Convert slice data to a real map and iterate
        mapData := data.Interface().(map[interface{}]interface{})
        flattenData, err := Flatten(mapData, "", UnderscoreStyle)
        if err != nil {
            break
        }

        message := ""
        host := ""

        for k, v := range flattenData {
            value := ""
            switch t := v.(type) {
            case string:
                value = t
            case []byte:
                value = string(t)
            default:
                value = fmt.Sprintf("%v", v)
            }

            if k == "pod_name" {
                host = value
            }

            if k == messageKey {
                message = value
            }

        }
        if message == "" || host == "" {
            break
        }

        m := &sarama.ProducerMessage{
            Topic: topic,
            Key:   sarama.StringEncoder(fmt.Sprintf("host=%s|module=%s", host, module)),
            Value: sarama.ByteEncoder(message),
        }
        msgs = append(msgs, m)

    }

    err = producer.SendMessages(msgs)
    if err != nil {
        log.Printf("FAILED to send kafka message: %s
", err)
        return output.FLB_ERROR
    }
    return output.FLB_OK
}

//export FLBPluginExit
func FLBPluginExit() int {
    producer.Close()
    return output.FLB_OK
}

func main() {
}

FLBPluginExit 插件退出的时候需要执行的一些方法,比如关闭连接。

FLBPluginRegister 注册插件

FLBPluginInit 插件初始化

FLBPluginFlush flush到数据到output

FLBPluginConfigKey 获取配置文件中参数

PS
当然除了FLBPluginConfigKey之外,也可以通过获取环境变量来获得设置参数。
ctx相当于一个上下文,负责之间的数据的传递。

编译和执行

编译的时候

go build -buildmode=c-shared -o out_kafka.so .

生成out_kafka.so

执行的时候

/fluent-bit/bin/fluent-bit" -c /fluent-bit/etc/fluent-bit.conf -e /fluent-bit/out_kafka.so
总结

采用类似的编写结构,就可以定制化自己的输出插件了。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/32693.html

相关文章

  • k8s日志--采用golang实现Fluent Bitoutput插件

    摘要:采用实现的插件前言目前社区日志采集和处理的组件不少,之前方案中的,社区中的,方案中的以及大数据用到比较多的。适合采用的方案,实现日志中心化收集的方案。主要负责采集,负责处理和传送。 采用golang实现Fluent Bit的output插件 前言 目前社区日志采集和处理的组件不少,之前elk方案中的logstash,cncf社区中的fluentd,efk方案中的filebeat,以及大...

    岳光 评论0 收藏0
  • k8slog--利用fluent bit收集k8s日志

    摘要:是一个开源和多平台的,它允许您从不同的来源收集数据日志,统一并将它们发送到多个目的地。例如日志收集日志分析主要讲部署的集群。日志主要有和的日志,一般采用部署,自然而然就是要支持格式日志的采集。业务落盘的日志。部署方案采取部署。 前言 收集日志的组件多不胜数,有ELK久负盛名组合中的logstash, 也有EFK组合中的filebeat,更有cncf新贵fluentd,另外还有大数据领域...

    betacat 评论0 收藏0
  • k8slog--利用fluent bit收集k8s日志

    摘要:是一个开源和多平台的,它允许您从不同的来源收集数据日志,统一并将它们发送到多个目的地。例如日志收集日志分析主要讲部署的集群。日志主要有和的日志,一般采用部署,自然而然就是要支持格式日志的采集。业务落盘的日志。部署方案采取部署。 前言 收集日志的组件多不胜数,有ELK久负盛名组合中的logstash, 也有EFK组合中的filebeat,更有cncf新贵fluentd,另外还有大数据领域...

    CoffeX 评论0 收藏0
  • 关于k8s集群容器日志收集总结

    摘要:我推荐你使用进行日志收集,将作为的出口。集群目前暂时没有提供日志查看机制。以如下的形式启动容器,容器日志将发往配置的。 【作者barnett】本文介绍了k8s官方提供的日志收集方法,并介绍了Fluentd日志收集器并与其他产品做了比较。最后介绍了好雨云帮如何对k8s进行改造并使用ZeroMQ以消息的形式将日志传输到统一的日志处理中心。 容器日志存在形式 目前容器日志有两种输出形式: ...

    jeffrey_up 评论0 收藏0
  • 关于k8s集群容器日志收集总结

    摘要:我推荐你使用进行日志收集,将作为的出口。集群目前暂时没有提供日志查看机制。以如下的形式启动容器,容器日志将发往配置的。 【作者barnett】本文介绍了k8s官方提供的日志收集方法,并介绍了Fluentd日志收集器并与其他产品做了比较。最后介绍了好雨云帮如何对k8s进行改造并使用ZeroMQ以消息的形式将日志传输到统一的日志处理中心。 容器日志存在形式 目前容器日志有两种输出形式: ...

    or0fun 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<