资讯专栏INFORMATION COLUMN

日志服务Flink Connector《支持Exactly Once》

endiat / 1037人阅读

摘要:是阿里云日志服务提供的,用于对接的工具,包括两部分,消费者和生产者。子用户使用需要授权如下几个用于将数据写到阿里云日志服务中。

摘要: Flink log connector是阿里云日志服务推出的,用于对接Flink的工具,包含两块,分别是消费者和生产者,消费者用于从日志服务中读数据,支持exactly once语义,生产者用于将数据写到日志服务中,该Connector隐藏了日志服务的一些概念,比如Shard的分裂合并等,用户在使用时只需要专注在自己的业务逻辑即可。

阿里云日志服务是针对实时数据一站式服务,用户只需要将精力集中在分析上,过程中数据采集、对接各种存储计算、数据索引和查询等琐碎工作等都可以交给日志服务完成。

日志服务中最基础的功能是LogHub,支持数据实时采集与消费,实时消费家族除 Spark Streaming、Storm、StreamCompute(Blink外),目前新增Flink啦。

Flink Connector
Flink log connector是阿里云日志服务提供的,用于对接flink的工具,包括两部分,消费者(Consumer)和生产者(Producer)。

消费者用于从日志服务中读取数据,支持exactly once语义,支持shard负载均衡.
生产者用于将数据写入日志服务,使用connector时,需要在项目中添加maven依赖:


            org.apache.flink
            flink-streaming-java_2.11
            1.3.2


            com.aliyun.openservices
            flink-log-connector
            0.1.3


            com.google.protobuf
            protobuf-java
            2.5.0

 
            com.aliyun.openservices
            aliyun-log
            0.6.10
 

            com.aliyun.openservices
            log-loghub-producer
            0.1.8

代码:Github

用法
请参考日志服务文档,正确创建Logstore。
如果使用子账号访问,请确认正确设置了LogStore的RAM策略。参考授权RAM子用户访问日志服务资源。
1. Log Consumer
在Connector中, 类FlinkLogConsumer提供了订阅日志服务中某一个LogStore的能力,实现了exactly once语义,在使用时,用户无需关心LogStore中shard数
量的变化,consumer会自动感知。

flink中每一个子任务负责消费LogStore中部分shard,如果LogStore中shard发生split或者merge,子任务消费的shard也会随之改变。

1.1 配置启动参数

Properties configProps = new Properties();
// 设置访问日志服务的域名
configProps.put(ConfigConstants.LOG_ENDPOINT, "cn-hangzhou.log.aliyuncs.com");
// 设置访问ak
configProps.put(ConfigConstants.LOG_ACCESSSKEYID, "");
configProps.put(ConfigConstants.LOG_ACCESSKEY, "");
// 设置日志服务的project
configProps.put(ConfigConstants.LOG_PROJECT, "ali-cn-hangzhou-sls-admin");
// 设置日志服务的LogStore
configProps.put(ConfigConstants.LOG_LOGSTORE, "sls_consumergroup_log");
// 设置消费日志服务起始位置
configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_END_CURSOR);
// 设置日志服务的消息反序列化方法
RawLogGroupListDeserializer deserializer = new RawLogGroupListDeserializer();
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream logTestStream = env.addSource(
        new FlinkLogConsumer(deserializer, configProps));

上面是一个简单的消费示例,我们使用java.util.Properties作为配置工具,所有Consumer的配置都可以在ConfigConstants中找到。

注意,flink stream的子任务数量和日志服务LogStore中的shard数量是独立的,如果shard数量多于子任务数量,每个子任务不重复的消费多个shard,如果少于,

那么部分子任务就会空闲,等到新的shard产生。

1.2 设置消费起始位置
Flink log consumer支持设置shard的消费起始位置,通过设置属性ConfigConstants.LOG_CONSUMER_BEGIN_POSITION,就可以定制消费从shard的头尾或者某个特定时间开始消费,具体取值如下:

Consts.LOG_BEGIN_CURSOR: 表示从shard的头开始消费,也就是从shard中最旧的数据开始消费。
Consts.LOG_END_CURSOR: 表示从shard的尾开始,也就是从shard中最新的数据开始消费。
UnixTimestamp: 一个整型数值的字符串,用1970-01-01到现在的秒数表示, 含义是消费shard中这个时间点之后的数据。
三种取值举例如下:

configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_BEGIN_CURSOR);
configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, Consts.LOG_END_CURSOR);
configProps.put(ConfigConstants.LOG_CONSUMER_BEGIN_POSITION, "1512439000");

1.3 监控:消费进度(可选)
Flink log consumer支持设置消费进度监控,所谓消费进度就是获取每一个shard实时的消费位置,这个位置使用时间戳表示,详细概念可以参考
文档消费组-查看状态,[消费组-监控报警
](https://help.aliyun.com/docum...。

configProps.put(ConfigConstants.LOG_CONSUMERGROUP, "your consumer group name”);

注意上面代码是可选的,如果设置了,consumer会首先创建consumerGroup,如果已经存在,则什么都不做,consumer中的snapshot会自动同步到日志服务的consumerGroup中,用户可以在日志服务的控制台查看consumer的消费进度。

1.4 容灾和exactly once语义支持
当打开Flink的checkpointing功能时,Flink log consumer会周期性的将每个shard的消费进度保存起来,当作业失败时,flink会恢复log consumer,并
从保存的最新的checkpoint开始消费。

写checkpoint的周期定义了当发生失败时,最多多少的数据会被回溯,也就是重新消费,使用代码如下:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 开启flink exactly once语义
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 每5s保存一次checkpoint
env.enableCheckpointing(5000);

更多Flink checkpoint的细节请参考Flink官方文档Checkpoints。

1.5 补充材料:关联 API与权限设置
Flink log consumer 会用到的阿里云日志服务接口如下:

GetCursorOrData

用于从shard中拉数据, 注意频繁的调用该接口可能会导致数据超过日志服务的shard quota, 可以通过ConfigConstants.LOG_FETCH_DATA_INTERVAL_MILLIS和ConfigConstants.LOG_MAX_NUMBER_PER_FETCH
控制接口调用的时间间隔和每次调用拉取的日志数量,shard的quota参考文章[shard简介](https://help.aliyun.com/document_detail/28976.html).
configProps.put(ConfigConstants.LOG_FETCH_DATA_INTERVAL_MILLIS, "100");
configProps.put(ConfigConstants.LOG_MAX_NUMBER_PER_FETCH, "100");

ListShards

 用于获取logStore中所有的shard列表,获取shard状态等.如果您的shard经常发生分裂合并,可以通过调整接口的调用周期来及时发现shard的变化。
// 设置每30s调用一次ListShards
configProps.put(ConfigConstants.LOG_SHARDS_DISCOVERY_INTERVAL_MILLIS, "30000");

CreateConsumerGroup

该接口调用只有当设置消费进度监控时才会发生,功能是创建consumerGroup,用于同步checkpoint。

ConsumerGroupUpdateCheckPoint

该接口用户将flink的snapshot同步到日志服务的consumerGroup中。

子用户使用Flink log consumer需要授权如下几个RAM Policy:

Log Producer

FlinkLogProducer 用于将数据写到阿里云日志服务中。

注意producer只支持Flink at-least-once语义,这就意味着在发生作业失败的情况下,写入日志服务中的数据有可能会重复,但是绝对不会丢失。

用法示例如下,我们将模拟产生的字符串写入日志服务:

// 将数据序列化成日志服务的数据格式
class SimpleLogSerializer implements LogSerializationSchema {

    public RawLogGroup serialize(String element) {
        RawLogGroup rlg = new RawLogGroup();
        RawLog rl = new RawLog();
        rl.setTime((int)(System.currentTimeMillis() / 1000));
        rl.addContent("message", element);
        rlg.addLog(rl);
        return rlg;
    }
}
public class ProducerSample {
    public static String sEndpoint = "cn-hangzhou.log.aliyuncs.com";
    public static String sAccessKeyId = "";
    public static String sAccessKey = "";
    public static String sProject = "ali-cn-hangzhou-sls-admin";
    public static String sLogstore = "test-flink-producer";
    private static final Logger LOG = LoggerFactory.getLogger(ConsumerSample.class);


    public static void main(String[] args) throws Exception {

        final ParameterTool params = ParameterTool.fromArgs(args);
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setGlobalJobParameters(params);
        env.setParallelism(3);

        DataStream simpleStringStream = env.addSource(new EventsGenerator());

        Properties configProps = new Properties();
        // 设置访问日志服务的域名
        configProps.put(ConfigConstants.LOG_ENDPOINT, sEndpoint);
        // 设置访问日志服务的ak
        configProps.put(ConfigConstants.LOG_ACCESSSKEYID, sAccessKeyId);
        configProps.put(ConfigConstants.LOG_ACCESSKEY, sAccessKey);
        // 设置日志写入的日志服务project
        configProps.put(ConfigConstants.LOG_PROJECT, sProject);
        // 设置日志写入的日志服务logStore
        configProps.put(ConfigConstants.LOG_LOGSTORE, sLogstore);

        FlinkLogProducer logProducer = new FlinkLogProducer(new SimpleLogSerializer(), configProps);

        simpleStringStream.addSink(logProducer);

        env.execute("flink log producer");
    }
    // 模拟产生日志
    public static class EventsGenerator implements SourceFunction {
        private boolean running = true;

        @Override
        public void run(SourceContext ctx) throws Exception {
            long seq = 0;
            while (running) {
                Thread.sleep(10);
                ctx.collect((seq++) + "-" + RandomStringUtils.randomAlphabetic(12));
            }
        }

        @Override
        public void cancel() {
            running = false;
        }
    }
}

2.1 初始化
Producer初始化主要需要做两件事情:

初始化配置参数Properties, 这一步和Consumer类似, Producer有一些定制的参数,一般情况下使用默认值即可,特殊场景可以考虑定制:

// 用于发送数据的io线程的数量,默认是8
ConfigConstants.LOG_SENDER_IO_THREAD_COUNT
// 该值定义日志数据被缓存发送的时间,默认是3000
ConfigConstants.LOG_PACKAGE_TIMEOUT_MILLIS
// 缓存发送的包中日志的数量,默认是4096
ConfigConstants.LOG_LOGS_COUNT_PER_PACKAGE
// 缓存发送的包的大小,默认是3Mb
ConfigConstants.LOG_LOGS_BYTES_PER_PACKAGE
// 作业可以使用的内存总的大小,默认是100Mb
ConfigConstants.LOG_MEM_POOL_BYTES
上述参数不是必选参数,用户可以不设置,直接使用默认值。

重载LogSerializationSchema,定义将数据序列化成RawLogGroup的方法。

RawLogGroup是log的集合,每个字段的含义可以参考文档[日志数据模型](https://help.aliyun.com/document_detail/29054.html)。

如果用户需要使用日志服务的shardHashKey功能,指定数据写到某一个shard中,可以使用LogPartitioner产生数据的hashKey,用法例子如下:

FlinkLogProducer logProducer = new FlinkLogProducer(new SimpleLogSerializer(), configProps);
logProducer.setCustomPartitioner(new LogPartitioner() {
            // 生成32位hash值
            public String getHashKey(String element) {
                try {
                    MessageDigest md = MessageDigest.getInstance("MD5");
                    md.update(element.getBytes());
                    String hash = new BigInteger(1, md.digest()).toString(16);
                    while(hash.length() < 32) hash = "0" + hash;
                    return hash;
                } catch (NoSuchAlgorithmException e) {
                }
                return  "0000000000000000000000000000000000000000000000000000000000000000";
            }
        });

注意LogPartitioner是可选的,不设置情况下, 数据会随机写入某一个shard。

2.2 权限设置:RAM Policy
Producer依赖日志服务的API写数据,如下:

log:PostLogStoreLogs
log:ListShards
当RAM子用户使用Producer时,需要对上述两个API进行授权:

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/51577.html

相关文章

  • Flink实战(八) - Streaming Connectors 编程

    摘要:默认情况下,当数据元到达时,分段接收器将按当前系统时间拆分,并使用日期时间模式命名存储区。如果需要,可以使用数据元或元组的属性来确定目录。这将调用传入的数据元并将它们写入部分文件,由换行符分隔。消费者的消费者被称为或等。 1 概览 1.1 预定义的源和接收器 Flink内置了一些基本数据源和接收器,并且始终可用。该预定义的数据源包括文件,目录和插socket,并从集合和迭代器摄取数据...

    beita 评论0 收藏0
  • OPPO数据中台之基石:基于Flink SQL构建实数据仓库

    摘要:实际上,本身就预留了与外部元数据对接的能力,分别提供了和这两个抽象。对接外部数据源搞清楚了注册库表的过程,给我们带来这样一个思路如果外部元数据创建的表也能被转换成可识别的,那么就能被无缝地注册到。 本文整理自 2019 年 4 月 13 日在深圳举行的 Flink Meetup 会议,分享嘉宾张俊,目前担任 OPPO 大数据平台研发负责人,也是 Apache Flink contrib...

    jeffrey_up 评论0 收藏0
  • Flink1.7稳定版发布:新增功能为企业生产带来哪些好处

    摘要:通过状态演变,可以在状态模式中添加或删除列,以便更改应用程序部署后应捕获的业务功能。本地恢复通过扩展的调度来完成本地恢复功能,以便在恢复时考虑先前的部署位置。此功能大大提高了恢复速度。问题导读1.Flink1.7开始支持Scala哪个版本?2.Flink1.7状态演变在实际生产中有什么好处?3.支持SQL/Table API中的富集连接可以做那些事情?4.Flink1.7新增了哪些连接器Ap...

    Hwg 评论0 收藏0
  • 从 Spark Streaming 到 Apache Flink : 实时数据流在爱奇艺的演进

    摘要:在移动端,爱奇艺月度总有效时长亿小时,稳居中国榜第三名。爱奇艺的峰值事件数达到万秒,在正确性容错性能延迟吞吐量扩展性等方面均遇到不小的挑战。从到爱奇艺主要使用的是和来进行流式计算。作者:陈越晨 整理:刘河 本文将为大家介绍Apache Flink在爱奇艺的生产与实践过程。你可以借此了解到爱奇艺引入Apache Flink的背景与挑战,以及平台构建化流程。主要内容如下: 爱奇艺在实时计算方...

    econi 评论0 收藏0

发表评论

0条评论

endiat

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<