资讯专栏INFORMATION COLUMN

开源组件Flink关于redisSink业务需求改造分享

IT那活儿 / 2617人阅读
开源组件Flink关于redisSink业务需求改造分享
[
背景
]


由于项目需求,用Flink生成完实时数据之后,需要立即给前台做展示,且日志项目实时数据数量较大,因此采用了Redis作为数据库,用来存储最近一小段时间的数据,供前台来进行实时展示。


[
问题
]


Flink的Sink方法提供了一个redis的sink包(flink-connector-redis_2.11),但是使用过程中发现与业务不符合。主要是2个方面:


  • Flink提供的包入redis时不能设置失效时间,业务场景是只需要保存最新的实时数据一段时间,需要设置失效时间,让数据过期,从而提高资源的利用和性能。


  • 业务使用的是Redis集群模式,Flink提供的Jar包虽然支持集群但是没有设置密码的地方,业务的Redis集群都有设置密码的强制要求。


[
解决方案
]


对Flink提供RedisSink包进行改造,具体的是:1.增加可以设置失效时间的方法;2.集群模式提供Redis的验证方法。


1、增加缺失的方法

由于原来的RedisMapper只提供了获取入库方式、获取key以及获取Value,三个接口,没有提供获取失效时间的接口。


所以首先重新定义RedisMapper接口类,增加一个获取失效时间的接口。


其次在RedisCommand中增加一个带失效时间的命令

SETEX(BasicRedisDataType.STRING)接口既然以及改好了,就要去重新定义它的实现了。


接着在RedisSink的invoke方法中提供获取失效时间的代码,以及SETNX命令的实现。


由于原来的RedisCommandsContainer接口中不包含失效时间的方法,所以需要新增一个含失效时间的接口。


在具体的RedisClusterContainer集群实现该方法。


这样第一个问题就解决了,只要将设置为RedisCommand.SETEX就可以进行带失效时间的Sink方法了。


2、增加缺失的参数

关于增加设置密码的方式,Flink提供的包中JedisClusterConfig是缺失了集群密码设置的。因此需要添加一个密码选项,并提供set方法。


然后初始化的时候增加一个构造方法。


其实最终还是调用的原生redis连接包来创建了一个集群对象redis.clients.jedis.JedisCluster,在RedisCommandsContainerBuilder中添加带密码的redis集群创建方式。


通过这样改造就拥有了设置密码的方式,如果还少了别的参数同理可以通过这样的方式给添加上去。为了灵活改造我们实际直接将整个jar拉取了下来,重新定义了一个属于自己的Sink定制包,最终使用如下:


简要的给出类图


[
总结
]


本文主要通过结合项目的实际使用场景,在flink与redis的sink包(flink-connector-redis_2.11)不能满足要求的情况下,在原有包的基础上根据实际使用情况,对该包进行了一系列的改造,主要包括提供设置密码的redis集群初始化以及在sink过程中可以指定redis键值的失效时长,最终通过改造的flink-connector-redis在项目flink任务中使用良好。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/130151.html

相关文章

  • Apache Flink,流计算?不仅仅是流计算!

    摘要:基于流处理机制实现批流融合相对基于批处理机制实现批流融合的思想更自然,更合理,也更有优势,因此阿里巴巴在基于支持大量核心实时计算场景的同时,也在不断改进的架构,使其朝着真正批流融合的统一计算引擎方向前进。 阿里妹导读:2018年12月下旬,由阿里巴巴集团主办的Flink Forward China在北京国家会议中心举行。Flink Forward是由Apache软件基金会授权的全球范围...

    KoreyLee 评论0 收藏0
  • 取之开源,用之开源——深度剖析阿里巴巴对Apache Flink的优化与改进

    摘要:基于在阿里巴巴搭建的平台于年正式上线,并从阿里巴巴的搜索和推荐这两大场景开始实现。在经过一番调研之后,阿里巴巴实时计算认为是一个非常适合的选择。接下来,我们聊聊阿里巴巴在层对又大刀阔斧地进行了哪些改进。 Apache Flink 概述 Apache Flink(以下简称Flink)是诞生于欧洲的一个大数据研究项目,原名StratoSphere。该项目是柏林工业大学的一个研究性项目,早期...

    YJNldm 评论0 收藏0
  • OPPO数据中台之基石:基于Flink SQL构建实数据仓库

    摘要:实际上,本身就预留了与外部元数据对接的能力,分别提供了和这两个抽象。对接外部数据源搞清楚了注册库表的过程,给我们带来这样一个思路如果外部元数据创建的表也能被转换成可识别的,那么就能被无缝地注册到。 本文整理自 2019 年 4 月 13 日在深圳举行的 Flink Meetup 会议,分享嘉宾张俊,目前担任 OPPO 大数据平台研发负责人,也是 Apache Flink contrib...

    jeffrey_up 评论0 收藏0

发表评论

0条评论

IT那活儿

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<