资讯专栏INFORMATION COLUMN

ogg同步数据到kafka案例简介

IT那活儿 / 1183人阅读
ogg同步数据到kafka案例简介


点击上方蓝字关注我们


1、前  言


平时ogg数据同步的案例中,遇到最多就是常见RDBMS之间的同步,源端和目标端分别找合适安装包,按照经典的抽取、传输、复制3个进程进行实施,例如oracleoracle,oraclemysql,sqlserveroracle等等场景。早就知道ogg也可以同步数据到HahoopBigData平台及Kafka等消息中间件进行同步,今天终于遇到这样的实际需求了。

需求:某系统需要原生实时数据采集到大数据平台。根据统一的实现方案,kafka规划原则:

  • 省分topic隔离。不同省分,不共用相同的topic

  • 依据省份生产库实例建设情况,按实例划分对接topic,要求kafka一个topic对应一个物理数据库实例;

  • 为保证每张表数据在kafka中保证“有序”,要求每张表对应topic的一个partition

具体省分kafka规划方案如下:


2、背  景


  2.1 GoldenGate for Big Data 19c

从图上可以看到,OGG_BigData_Adapter支持的开源组件还真不少。


  2.2 软件版本

针对本次环境源端是oracle12c数据库,目标端是Kafka2.11-1.0.2集群。源端使用ogg12cfor oracle即可,目标端我们需要使用OGG_BigData的软件包来实现的。根据官方的版本适配文档,确定使用19c19.1来搭建。


源端软件包  

123014_fbo_ggs_Linux_x64_shiphome.zip

12.3.0.1.4OGGCORE_12.3.0.1.0_PLATFORMS_180415.0359_FBO

目标端软件包

OGG_BigData_Linux_x64_19.1.0.0.1.zip

19.1.0.0.2OGGCORE_OGGADP.19.1.0.0.2_PLATFORMS_190916.0039


  2.3 逻辑流程

对于kafka集群来说,ogg的目标端程序实际上是作为kafka的生产者客户端,把解析trail文件得到的数据推送到kafka中。


3、实  施


  3.1 源端部署

源端数据库12cRAC的多租户模式,使用ogg12c12.3的集成模式,按常规配置ex_kafdp_kaf进程即可。

抽取进程EX_KAF参数

##view param EX_KAF


EXTRACT ex_kaf

SETENV (NLS_LANG=AMERICAN_AMERICA.ZHS16GBK)

setenv (ORACLE_HOME=/u01/app/oracle/product/12.2.0.1/db_1)

SETENV (ORACLE_SID=db2)

USERID c##ggs@db, PASSWORD XXXX

EXTTRAIL ./dirdat/ha

--DISCARDFILE ./dirrpt/ex_kaf.DSC, APPEND,MEGABYTES 100


-- report info

REPORTCOUNT EVERY 10 MINUTES, RATE

WARNLONGTRANS 2h,CHECKINTERVAL 30m


dboptions allowunusedcolumn

fetchoptions nousesnapshot

LOGALLSUPCOLS  //加入前镜像(12c新版本特有参数,遇到josn格式不固定,新老参数同时加

getupdatebefores  // 加入前镜像(老版本参数)

nocompressdeletes / /加入前镜像(老版本参数)

nocompressupdates // 加入前镜像(老版本参数)


--crm3hj 208 tables

TABLE crm3hj.CUST.TAB;



传输进程DP_KAF参数


##view param DP_KAF

EXTRACT dp_kaf

PASSTHRU //传输进程透传,不做任何处理

rmthost 192.168.100.100, mgrport 7809 , TCPBUFSIZE 300000, TCPFLUSHBYTES 300000//目标端地址

rmttrail ./dirdat/hk

EOFDELAYCSECS 10


--gettruncates

--crm3hj 8 tables

TABLE crm3hj.CUST.TAB;



  3.2 目标端部署

解压ogg压缩包

tar xvfOGG_BigData_Linux_x64_19.1.0.0.1.tar -C /oggdata/ggv191adp

因为要登录到kafka集群,需要引用对应jar包,故需部署

tar -zxvfkafka_2.11-1.0.2.tgz -C /oggdata/kafka

kafkalib目录为 /oggdata/kafka/kafka_2.11-1.0.2/libs

参数文件配置,这里除了rp_kaf进程参数文件外,还有kafka属性参数文件、生产者属性配置参数文件。


  • rp_kaf.prm参数文件

[oracle@server003 dirprm]$ cat rp_kaf.prm

REPLICAT rp_kaf //定义进程名称

sourcedefs ./dirdef/o2kaf_jt.def  //指定使用源和目标的表映射文件,高版本可省略

TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka_crm_ha.props //定义kafka一些适配性的库文件以及配置文件,配置文件指定属性文件


--reperror default, abend

reperror default, discard  //错误处理,这里将错误信息记录DSC文件

DISCARDFILE ./dirrpt/rp_kaf.DSC, APPEND, MEGABYTES 4096 //DSC文件的属性定义


REPORTCOUNT EVERY 10 MINUTES, RATE   //报告指定10min统计一次报告

--grouptransops 10000  //组提交,以事务传输时,事务合并的单位,减少IO操作


MAP CRM3HJ.CUST.TAB, TARGET CUST. TAB; //具体表的映射配置


  • kafka_crm_ha.props参数文件

[oracle@server003 dirprm]$ cat kafka_crm_ha.props

gg.handlerlist=kafkahandler //handler类型

gg.handler.kafkahandler.type=kafka

gg.handler.kafkahandler.topicMappingTemplate=tp_share_crm //指定kafkatopic

gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer_JT.properties //指定kafka生产者配置文件

gg.handler.kafkahandler.ProducerRecordClass=oracle.goldengate.handler.kafka.MyCreateProducerRecordHa//生产者方法

gg.handler.kafkahandler.BlockingSend=false

gg.handler.kafkahandler.includeTokens=false

gg.handler.kafkahandler.mode=op //OGG for BigData中传输模式,即op为一次SQL传输一次,tx为一次事务传输一次


gg.handler.kafkahandler.format=json #传输的消息最终解析的格式,格式相关

gg.handler.kafkahandler.format.includePrimaryKeys=true

gg.handler.kafkahandler.format.insertOpKey=I

gg.handler.kafkahandler.format.updateOpKey=U

gg.handler.kafkahandler.format.deleteOpKey=D


gg.handler.kafkahandler.authType=kerberos      //kerberos安全认证相关

gg.handler.kafkahandler.kerberosKeytabFile=/home/oracle/KDC/kafka_XXXX.keytab

gg.handler.kafkahandler.kerberosPrincipal=kafka_XXXX@HADOOP.XXXX.CN


goldengate.userexit.timestamp=utc+8

goldengate.userexit.writers=javawriter

javawriter.stats.display=true

javawriter.stats.full=true

javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar -Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/home/oracle/kafka_jass.conf //JVM设置


gg.log=log4j  //日志记录

gg.log.level=INFO

gg.report.time=30sec

gg.classpath=dirprm/:/oggdata/kafka/kafka_2.11-1.0.2/libs/*:/oggdata/ggv191adp/:/oggdata/ggv191adp/lib/* #Kafkalib目录


核心参数说明:

  • custom_kafka_producer.properties参数文件

[oracle@server003 dirprm]$ cat custom_kafka_producer_JT.properties

bootstrap.servers=XXXX.COM //kafka集群的broker的地址

acks=1 //参考KAFKAacks参数

compression.type=gzip  //压缩类型

reconnect.backoff.ms=1000 //重连延时


max.request.size=5024000 //请求发送设置

send.buffer.bytes=5024000


value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer

key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer


batch.size=102400

linger.ms=10000


security.protocol=SASL_PLAINTEXT

sasl.kerberos.service.name=kafka // kerberos安全认证相关

saal.machanism=GSSAPI




  3.3 同一topic下表对应partitions如何做

测试实践过程中,遇到最大的问题就是默认情况下,ogg会把所有表都放在同一topic下,而根据规范文档,不同的表要对应同一topic下的不同分区。

实际需求是每个表对应一个partition,例如

我们在kafka属性参数文件中,可以指定自定义生产者方法,继承ogg自带的生产者父类,编写自己的生产者方法这样就能实现表与分区的对应关系。

gg.handler.kafkahandler.ProducerRecordClass=MyCreateProducerRecordXX.java


1步:编写MyCreateProducerRecordXX.java文件

新建MyCreateProducerRecordXX类,实现ogg预定义好的接口方法CreateProducerRecord,编写自定义createProducerRecord方法。

2步:编译jar

3步:替换jar

[oracle@server003 lib]$ls -rtlh ggkafka-19.1.0.0.1.003.jar

-rwxr-xr-x 1 oracleoinstall 27K Sep 26 03:37 ggkafka-19.1.0.0.1.003.jar

[oracle@server003 lib]$pwd

/oggdata/ggv191adp/ggjava/resources/lib

4步:指定使用的自定义方法


  3.4 传到kafka数据的json格式查看

rp_kaf进程同步的数据,也就是生产的消息其实是json格式的DML操作数据,我们可以使用消费者命令检查查看数据内容:表名、操作类型、更新时间、主键信息、数据行before镜像,数据行after镜像。


4、总结


1) ogg往kafka传数据大体上还是之前的套路,区别点就在于怎么把复制进程当做客户端,当做生产者去往kafka对应的topic上生产数据。

2) Kafka作为高吞吐量、低延迟、高并发的消息中间件产品,我们的同步进程甚至不需要考虑目标端的性能问题,只要往kafka上推送数据,最终的数据使用则是另一端的消费者程序怎么使用数据的问题。

3) 既然是BigData的adapter软件包,还可以实现往HDFS、Hive、Hbase、ApacheCassandra、MongoDB、Greenplum等多种开源产品中同步数据,基本与kafka的配置类似,自定义类的实现为数据同步提供了更多的可能性,有待尝试。

4) 实际生产中的配置,还涉及到安全认证的问题,KDC的认证在这里省略。

5) 扩展联想一下,如果kafka消费者程序可以连接到不同的数据库、不同的大数据开源组件进行数据的消费,那么就可以形成一个统一的模式,ogg_for_XXDB ogg_for Big DataKAFKA任意数据库。

6) 再联想一下,在kafka上看到json格式里有数据变化的前后镜像,是不是可以结合这个做一个基于ogg的数据操作闪回功能?


END



文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/130014.html

相关文章

  • 网易云捕性能踩坑解决之道上篇

    摘要:从零开始设计开发一个日处理数据亿的大数据高并发实时系统,哪些性能问题需要特别注意这里我们一起梳理一下本文中我将以,同学戏称的系统网易云捕设计开发实践中两年的时间里碰到的真实问题,踩过的坑及解决问题的方法和大家一起讨论如何解决这些问题。 本文由作者余宝虹授权网易云社区发布。 从零开始设计开发一个日处理数据8亿的大数据高并发实时系统,哪些性能问题需要特别注意?这里我们一起梳理一下,本文中我...

    李义 评论0 收藏0
  • 慕课网_《Kafka流处理平台》学习总结

    摘要:慕课网流处理平台学习总结时间年月日星期日说明本文部分内容均来自慕课网。 慕课网《Kafka流处理平台》学习总结 时间:2018年09月09日星期日 说明:本文部分内容均来自慕课网。@慕课网:https://www.imooc.com 教学源码:无 学习源码:https://github.com/zccodere/s... 第一章:课程介绍 1-1 课程介绍 课程介绍 Kafk...

    Maxiye 评论0 收藏0
  • OGG Integrated Native DDL简单测试

    OGG Integrated Native DDL简单测试 img{ display:block; margin:0 auto !important; width:100%; } body{ width:75%;...

    IT那活儿 评论0 收藏1085

发表评论

0条评论

IT那活儿

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<