ITPub博客

首页 > 大数据 > 数据分析 > GOLDENGATE BIG DATA FOR KFAKA 配置步骤 -表结构相同

GOLDENGATE BIG DATA FOR KFAKA 配置步骤 -表结构相同

原创 数据分析 作者:xhailiang 时间:2017-04-01 23:13:58 0 删除 编辑
GOLDENGATE BIG DATA FOR KFAKA 配置步骤 -表结构相同

环境:



DB 类型 GOLDENGATE 版本
源端 172.16.49.38 ORACLE 11.2.0.4 ADG OGGCORE_12.2.0.1.0 for 11G
目标端 172.16.49.88/89 KAFKA-2.1.0-1.2.1.0.p0.115 OGG_BigData_12.2.0.1.0
zookeeper 172.16.49.243:2181,172.16.49.244:2181,172.16.49.245:2181

软件安装:

需要软件清单:

GOLDENGATE FOR ORACLE
GOLDENGATE FOR BIG DATA
KAFKA
ZOOKEEPER

软件下载:

GOLDENGATE在ORACLE官方网站下载上述对应软件版本;
KAFKA ZOOKEEPER 使用CDH部署,在cloudera下载;

软件安装:

可参考博客里之前文章。
ORACLE ADG 机器安装 GOLDENGATE FOR ORACLE 11G(参考安装手册,步骤省略)
KAFKA机器安装GOLDENGATE FOR BIGDATA 12 (参考安装手册,步骤省略)
Zookeeper Kafka 连接信息:
KAFKA连接信息:bootstrap.servers=172.16.49.88:9092,172.16.49.89:9092
ZOOKEEPER :172.16.49.243:2181,172.16.49.244:2181,172.16.49.245:2181
注意JAVA版本 >=1.7, 设置JAVA_HOME 环境变量
OGGHOME 设置为 /goldengate/gg12c

配置:

目标端准备:

将 /goldengate/gg12c/AdapterExamples/big-data/kafka下 custom_kafka_producer.properties kafka.props 两个文件COPY 到 /goldengate/gg12c/dirprm 目录下。
编辑custom_kafka_producer.properties 添加KAFKA集群连接信息 :
bootstrap.servers=172.16.49.88:9092,172.16.49.89:9092
acks=1
compression.type=gzip
reconnect.backoff.ms=1000
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
# 100KB per partition
batch.size=102400
linger.ms=10000
编辑kafka.props ,内容如下
gg.handlerlist = kafkahandler
gg.handler.kafkahandler.type = kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
gg.handler.kafkahandler.format=json --这里采用JSON 格式
gg.handler.kafkahandler.BlockingSend =false
gg.handler.kafkahandler.includeTokens=false
# add by xhl
#Publishing to Multiple Topics BY tablename
gg.handler.kafkahandler.topicPartitioning=table --每个表都创建一个同名 topic
gg.handler.kafkahandler.mode=op --这个必需选择op , 不能选择tx
goldengate.userexit.timestamp=utc
goldengate.userexit.writers=javawriter
javawriter.stats.display=TRUE
javawriter.stats.full=TRUE
gg.log=log4j
gg.log.level=DEBUG --测试阶段可设置为DEBUG ,好查找问题
#gg.log.level=INFO
gg.report.time=30sec
gg.classpath=/goldengate/gg12c/dirprm/:/opt/cloudera/parcels/KAFKA/lib/kafka/libs/*:
javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar

配置manager 进程参数

edit param mgr
PORT 7801
dynamicportlist 7820-7850

添加REPLICAT 进程:

add replicat kafka, exttrail ./dirdat/ka
edit param kafka
REPLICAT kafka
-- Trail file for this example is located in "AdapterExamples/trail" directory
-- Command to add REPLICAT
-- add replicat rkafka, exttrail AdapterExamples/trail/tr
setenv (NLS_LANG=AMERICAN_AMERICA.ZHS16GBK)
TARGETDB LIBFILE libggjava.so SET property=./dirprm/kafka.props
REPORTCOUNT EVERY 1 MINUTES, RATE
GROUPTRANSOPS 10000
MAP PAYADM.KAFKATEST, TARGET PAYADM.KAFKATEST;

源端准备:

添加extract、 pump进程,操作同OGG FOR ORACLE ,这里是从ACTIVE DATAGUARD抽取数据,所以EXTRACT 进程param 文件中要添加 TRANLOGOPTIONS MINEFROMACTIVEDG
extract、 pump进程param 内容如下:
view param ext01
EXTRACT ext01
setenv ( NLS_LANG="SIMPLIFIED CHINESE_CHINA.ZHS16GBK" )
setenv ( ORACLE_SID=uatracdg )
userid goldengate, password golden123
exttrail ./dirdat/et
gettruncates
--dynamicresolution
discardfile ./dirrpt/discardext01.txt, append, megabytes 50
--DBOPTIONS ALLOWUNUSEDCOLUMN
--TRANLOGOPTIONS CONVERTUCS2CLOBS
EOFDELAY 3
getupdatebefores
TRANLOGOPTIONS MINEFROMACTIVEDG --FOR ADG
table payadm.kafkatest;
PUMP进程:
view param kafka
extract kafka
setenv ( NLS_LANG="SIMPLIFIED CHINESE_CHINA.ZHS16GBK" )
userid goldengate password golden123
rmthost 172.16.49.88, mgrport 7801, TCPBUFSIZE 5000000
rmttrail ./dirdat/ka
DYNAMICRESOLUTION
NUMFILES 3000
ALLOCFILES 200
table payadm.kafkatest;
为PAYADM.KAFKATEST表添加TRANDATA,注意这个虽然在ADG库的GOLDENGATE 操作,但在DBLOGIN时需要登陆ADG 的主库
源端:
启动 EXTRACT PUMP
ggsci > start ext01
ggsci > start kafka
目标端:
ggsci > start mgr
ggsci > alter kafka, begin now
ggsci > start kafka
配置完成。

测试:

在源端主库:

insert into payadm.kafkatest values (20,'x');
insert into payadm.kafkatest values (21,'x');
insert into payadm.kafkatest values (22,'x');
insert into payadm.kafkatest values (23,'x');
commit;

到目标查看:

PAYADM.KAFKATEST topic 是否自动创建:
kafka-topics --list --zookeeper 172.16.49.243:2181,172.16.49.244:2181,172.16.49.245:2181/kafka --topic
查看数据是否同步成功:
kafka-console-consumer -zookeeper 172.16.49.245:2181/kafka -topic PAYADM.KAFKATEST -from-beginning

查看 PAYADM.KAFKATEST topic
kafka-topics --describe --zookeeper 172.16.49.245:2181/kafka --topic PAYADM.KAFKATEST

上面测试的是表结构完全一致同步,后续介绍异构表同步的方法,这里需要用到Metadata Provider。

来自 “ ITPUB博客 ” ,链接:http://blog.itpub.net/308563/viewspace-2136482/,如需转载,请注明出处,否则将追究法律责任。

请登录后发表评论 登录
全部评论

注册时间:2008-02-17

  • 博文量
    270
  • 访问量
    425779