平时ogg数据同步的案例中,遇到最多就是常见RDBMS之间的同步,源端和目标端分别找合适安装包,按照经典的抽取、传输、复制3个进程进行实施,例如oracle到oracle,oracle到mysql,sqlserver到oracle等等场景。早就知道ogg也可以同步数据到Hahoop等BigData平台及Kafka等消息中间件进行同步,今天终于遇到这样的实际需求了。
需求:某系统需要原生实时数据采集到大数据平台。根据统一的实现方案,kafka规划原则:
省分topic隔离。不同省分,不共用相同的topic;
依据省份生产库实例建设情况,按实例划分对接topic,要求kafka一个topic对应一个物理数据库实例;
为保证每张表数据在kafka中保证“有序”,要求每张表对应topic的一个partition;
具体省分kafka规划方案如下:
针对本次环境源端是oracle12c数据库,目标端是Kafka2.11-1.0.2集群。源端使用ogg12cfor oracle即可,目标端我们需要使用OGG_BigData的软件包来实现的。根据官方的版本适配文档,确定使用19c19.1来搭建。
源端软件包
123014_fbo_ggs_Linux_x64_shiphome.zip
(12.3.0.1.4OGGCORE_12.3.0.1.0_PLATFORMS_180415.0359_FBO)
目标端软件包
OGG_BigData_Linux_x64_19.1.0.0.1.zip
(19.1.0.0.2OGGCORE_OGGADP.19.1.0.0.2_PLATFORMS_190916.0039)
对于kafka集群来说,ogg的目标端程序实际上是作为kafka的生产者客户端,把解析trail文件得到的数据推送到kafka中。
源端数据库12cRAC的多租户模式,使用ogg12c12.3的集成模式,按常规配置ex_kaf、dp_kaf进程即可。
抽取进程EX_KAF参数
##view param EX_KAF EXTRACT ex_kaf SETENV (NLS_LANG=AMERICAN_AMERICA.ZHS16GBK) setenv (ORACLE_HOME=/u01/app/oracle/product/12.2.0.1/db_1) SETENV (ORACLE_SID=db2) USERID c##ggs@db, PASSWORD XXXX EXTTRAIL ./dirdat/ha --DISCARDFILE ./dirrpt/ex_kaf.DSC, APPEND,MEGABYTES 100 -- report info REPORTCOUNT EVERY 10 MINUTES, RATE WARNLONGTRANS 2h,CHECKINTERVAL 30m dboptions allowunusedcolumn fetchoptions nousesnapshot LOGALLSUPCOLS //加入前镜像(12c新版本特有参数,遇到josn格式不固定,新老参数同时加) getupdatebefores // 加入前镜像(老版本参数) nocompressdeletes / /加入前镜像(老版本参数) nocompressupdates // 加入前镜像(老版本参数) --crm3hj 208 tables TABLE crm3hj.CUST.TAB; |
##view param DP_KAF EXTRACT dp_kaf PASSTHRU //传输进程透传,不做任何处理 rmthost 192.168.100.100, mgrport 7809 , TCPBUFSIZE 300000, TCPFLUSHBYTES 300000//目标端地址 rmttrail ./dirdat/hk EOFDELAYCSECS 10 --gettruncates --crm3hj 8 tables TABLE crm3hj.CUST.TAB; |
解压ogg压缩包
tar xvfOGG_BigData_Linux_x64_19.1.0.0.1.tar -C /oggdata/ggv191adp
因为要登录到kafka集群,需要引用对应jar包,故需部署
tar -zxvfkafka_2.11-1.0.2.tgz -C /oggdata/kafka
kafka的lib目录为 /oggdata/kafka/kafka_2.11-1.0.2/libs
参数文件配置,这里除了rp_kaf进程参数文件外,还有kafka属性参数文件、生产者属性配置参数文件。
rp_kaf.prm参数文件
[oracle@server003 dirprm]$ cat rp_kaf.prm REPLICAT rp_kaf //定义进程名称 sourcedefs ./dirdef/o2kaf_jt.def //指定使用源和目标的表映射文件,高版本可省略 TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka_crm_ha.props //定义kafka一些适配性的库文件以及配置文件,配置文件指定属性文件 --reperror default, abend reperror default, discard //错误处理,这里将错误信息记录DSC文件 DISCARDFILE ./dirrpt/rp_kaf.DSC, APPEND, MEGABYTES 4096 //对DSC文件的属性定义 REPORTCOUNT EVERY 10 MINUTES, RATE //报告指定10min统计一次报告 --grouptransops 10000 //组提交,以事务传输时,事务合并的单位,减少IO操作 MAP CRM3HJ.CUST.TAB, TARGET CUST. TAB; //具体表的映射配置 |
kafka_crm_ha.props参数文件
[oracle@server003 dirprm]$ cat kafka_crm_ha.props gg.handlerlist=kafkahandler //handler类型 gg.handler.kafkahandler.type=kafka gg.handler.kafkahandler.topicMappingTemplate=tp_share_crm //指定kafka的topic gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer_JT.properties //指定kafka生产者配置文件 gg.handler.kafkahandler.ProducerRecordClass=oracle.goldengate.handler.kafka.MyCreateProducerRecordHa//生产者方法 gg.handler.kafkahandler.BlockingSend=false gg.handler.kafkahandler.includeTokens=false gg.handler.kafkahandler.mode=op //OGG for BigData中传输模式,即op为一次SQL传输一次,tx为一次事务传输一次 gg.handler.kafkahandler.format=json #传输的消息最终解析的格式,格式相关 gg.handler.kafkahandler.format.includePrimaryKeys=true gg.handler.kafkahandler.format.insertOpKey=I gg.handler.kafkahandler.format.updateOpKey=U gg.handler.kafkahandler.format.deleteOpKey=D gg.handler.kafkahandler.authType=kerberos //kerberos安全认证相关 gg.handler.kafkahandler.kerberosKeytabFile=/home/oracle/KDC/kafka_XXXX.keytab gg.handler.kafkahandler.kerberosPrincipal=kafka_XXXX@HADOOP.XXXX.CN goldengate.userexit.timestamp=utc+8 goldengate.userexit.writers=javawriter javawriter.stats.display=true javawriter.stats.full=true javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar -Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/home/oracle/kafka_jass.conf //JVM设置 gg.log=log4j //日志记录 gg.log.level=INFO gg.report.time=30sec gg.classpath=dirprm/:/oggdata/kafka/kafka_2.11-1.0.2/libs/*:/oggdata/ggv191adp/:/oggdata/ggv191adp/lib/* #Kafka的lib目录 |
核心参数说明:
custom_kafka_producer.properties参数文件
[oracle@server003 dirprm]$ cat custom_kafka_producer_JT.properties bootstrap.servers=XXXX.COM //kafka集群的broker的地址 acks=1 //参考KAFKA的acks参数 compression.type=gzip //压缩类型 reconnect.backoff.ms=1000 //重连延时 max.request.size=5024000 //请求发送设置 send.buffer.bytes=5024000 value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer batch.size=102400 linger.ms=10000 security.protocol=SASL_PLAINTEXT sasl.kerberos.service.name=kafka // kerberos安全认证相关 saal.machanism=GSSAPI |
测试实践过程中,遇到最大的问题就是默认情况下,ogg会把所有表都放在同一topic下,而根据规范文档,不同的表要对应同一topic下的不同分区。
实际需求是每个表对应一个partition,例如
我们在kafka属性参数文件中,可以指定自定义生产者方法,继承ogg自带的生产者父类,编写自己的生产者方法这样就能实现表与分区的对应关系。
gg.handler.kafkahandler.ProducerRecordClass=MyCreateProducerRecordXX.java
第1步:编写MyCreateProducerRecordXX.java文件
新建MyCreateProducerRecordXX类,实现ogg预定义好的接口方法CreateProducerRecord,编写自定义createProducerRecord方法。
第2步:编译jar包
第3步:替换jar包
[oracle@server003 lib]$ls -rtlh ggkafka-19.1.0.0.1.003.jar
-rwxr-xr-x 1 oracleoinstall 27K Sep 26 03:37 ggkafka-19.1.0.0.1.003.jar
[oracle@server003 lib]$pwd
/oggdata/ggv191adp/ggjava/resources/lib
第4步:指定使用的自定义方法
rp_kaf进程同步的数据,也就是生产的消息其实是json格式的DML操作数据,我们可以使用消费者命令检查查看数据内容:表名、操作类型、更新时间、主键信息、数据行before镜像,数据行after镜像。
1) ogg往kafka传数据大体上还是之前的套路,区别点就在于怎么把复制进程当做客户端,当做生产者去往kafka对应的topic上生产数据。
2) Kafka作为高吞吐量、低延迟、高并发的消息中间件产品,我们的同步进程甚至不需要考虑目标端的性能问题,只要往kafka上推送数据,最终的数据使用则是另一端的消费者程序怎么使用数据的问题。
3) 既然是BigData的adapter软件包,还可以实现往HDFS、Hive、Hbase、ApacheCassandra、MongoDB、Greenplum等多种开源产品中同步数据,基本与kafka的配置类似,自定义类的实现为数据同步提供了更多的可能性,有待尝试。
4) 实际生产中的配置,还涉及到安全认证的问题,KDC的认证在这里省略。
5) 扩展联想一下,如果kafka消费者程序可以连接到不同的数据库、不同的大数据开源组件进行数据的消费,那么就可以形成一个统一的模式,ogg_for_XXDB ogg_for Big DataKAFKA任意数据库。
6) 再联想一下,在kafka上看到json格式里有数据变化的前后镜像,是不是可以结合这个做一个基于ogg的数据操作闪回功能?
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/130014.html
摘要:从零开始设计开发一个日处理数据亿的大数据高并发实时系统,哪些性能问题需要特别注意这里我们一起梳理一下本文中我将以,同学戏称的系统网易云捕设计开发实践中两年的时间里碰到的真实问题,踩过的坑及解决问题的方法和大家一起讨论如何解决这些问题。 本文由作者余宝虹授权网易云社区发布。 从零开始设计开发一个日处理数据8亿的大数据高并发实时系统,哪些性能问题需要特别注意?这里我们一起梳理一下,本文中我...
摘要:慕课网流处理平台学习总结时间年月日星期日说明本文部分内容均来自慕课网。 慕课网《Kafka流处理平台》学习总结 时间:2018年09月09日星期日 说明:本文部分内容均来自慕课网。@慕课网:https://www.imooc.com 教学源码:无 学习源码:https://github.com/zccodere/s... 第一章:课程介绍 1-1 课程介绍 课程介绍 Kafk...
OGG Integrated Native DDL简单测试 img{ display:block; margin:0 auto !important; width:100%; } body{ width:75%;...
阅读 1250·2023-01-11 13:20
阅读 1559·2023-01-11 13:20
阅读 1013·2023-01-11 13:20
阅读 1680·2023-01-11 13:20
阅读 3972·2023-01-11 13:20
阅读 2520·2023-01-11 13:20
阅读 1356·2023-01-11 13:20
阅读 3486·2023-01-11 13:20