Pythian Blog: Technical Track

GoldenGate 12.2 big data adapters: part 3 - Kafka

This post continues my review of GoldenGate Big Data adapters started by review of HDFS and FLUME adapters. Here is list of all posts in the series:
  1. GoldenGate 12.2 Big Data Adapters: part 1 - HDFS
  2. GoldenGate 12.2 Big Data Adapters: part 2 - Flume
  3. GoldenGate 12.2 Big Data Adapters: part 3 - Kafka
In this article I will try the Kafka adapter and see how it works. Firstly, I think it may be worth reminding readers what Kafka is. Kafka is a streaming subscriber-publisher system. One can ask how it is different from Flume, and that question I've asked myself when I've heard about the Kafka. I think one of the best comparisons between Flume and Kafka has been made by Gwen Shapira & Jeff Holoman in the blog post Apache Kafka for Beginners . In essence, Kafka is general purpose system where most of the control and consumer functionality relays on your own built consumer programs. When in Flume you have pre-created sources, sinks, and can use interceptors for changing data. So, in Kafka you are getting on the destination exactly what you put on the source. Kafka and Flume can work together pretty well, and in this article I am going to use them both. Let's recall what we have in our configuration. We have an Oracle database running as a source, and Oracle GoldenGate for Oracle capturing changes for one schema in this database. We have OGG 12.2 and integrated extract on the source. The replication is going directly to trail files on the destination side where we have OGG for BigData installed on a Linux box. You can get more details about the installation on source and target from the first post in the series. I've made configuration as simple as possible dedicating most attention to the Big Data adapters functionality, which is after all the main point of the article. Having installed OGG for Big Data, we need to setup the Kafka adapter. As for other adapters, we are copying the configuration files from $OGG_HOME/AdapterExamples/big-data directory. [code lang="bash"] bash$ cp $OGG_HOME/AdapterExamples/big-data/kafka/* $OGG_HOME/dirdat/ [/code] We need to adjust our kafka.props file to define Kafka/Zookeper topics for data and schema changes (TopicName and SchemaTopicName parameters), and the gg.classpath for Kafka and Avro java classes. I left rest of the parameters default including format for the changes which was defined as "avro_op" in the example. [code lang="bash"] [oracle@sandbox oggbd]$ cat dirprm/kafka.props gg.handlerlist = kafkahandler gg.handler.kafkahandler.type = kafka gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties gg.handler.kafkahandler.TopicName =oggtopic gg.handler.kafkahandler.format =avro_op gg.handler.kafkahandler.SchemaTopicName=mySchemaTopic gg.handler.kafkahandler.BlockingSend =false gg.handler.kafkahandler.includeTokens=false gg.handler.kafkahandler.mode =tx #gg.handler.kafkahandler.maxGroupSize =100, 1Mb #gg.handler.kafkahandler.minGroupSize =50, 500Kb goldengate.userexit.timestamp=utc goldengate.userexit.writers=javawriter javawriter.stats.display=TRUE javawriter.stats.full=TRUE gg.log=log4j gg.log.level=INFO gg.report.time=30sec gg.classpath=dirprm/:/u01/kafka/libs/*:/usr/lib/avro/*: javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar [oracle@sandbox oggbd]$ [/code] The next file we have to correct is custom_kafka_producer.properties which contains information about our running Kafka server and define some addition parameters like compression. I left all the parameters unchanged except "bootstrap.servers" where I put information about my Kafka service. [code lang="bash"] [oracle@sandbox oggbd]$ cat dirprm/custom_kafka_producer.properties bootstrap.servers=sandbox:9092 acks=1 compression.type=gzip reconnect.backoff.ms=1000 value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer # 100KB per partition batch.size=102400 linger.ms=10000 [oracle@sandbox oggbd]$ [/code] If we plan an initial load through Kafka we can use something like that parameter file I prepared for a passive replicat : [code lang="text"] [oracle@sandbox oggbd]$ cat dirprm/irkafka.prm -- Trail file for this example is located in "dirdat" directory -- Command to run passive REPLICAT -- ./replicat paramfile dirprm/irkafka.prm reportfile dirrpt/irkafka.rpt SPECIALRUN END RUNTIME EXTFILE /u01/oggbd/dirdat/initld -- TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props REPORTCOUNT EVERY 1 MINUTES, RATE GROUPTRANSOPS 10000 MAP ggtest.*, TARGET bdtest.*; [oracle@sandbox oggbd]$ [/code] Before starting any replicat we need to prepare our system to receive the data. Since the Kafka itself is pure streaming system it cannot pass files to HDFS without other program or connector. In the first case we will be using Kafka passing data to Flume and from Flume will use its sink to HDFS. Please be aware that you need a Zookeeper to manage topics for Kafka. I am not going to discuss setting up Zookeeper in this article, just assume that we have it already and it is up and running on port 2181. I used Kafka version 0.9.0.1 downloading it from https://kafka.apache.org/downloads.html. After downloading the archive I unpacked it, slightly corrected configuration and started it in standalone mode. [code lang="bash"] [root@sandbox u01]# wget https://apache.parentingamerica.com/kafka/0.9.0.1/kafka_2.11-0.9.0.1.tgz --2016-03-15 15:22:09-- https://apache.parentingamerica.com/kafka/0.9.0.1/kafka_2.11-0.9.0.1.tgz Resolving apache.parentingamerica.com... 70.38.15.129 Connecting to apache.parentingamerica.com|70.38.15.129|:80... connected. HTTP request sent, awaiting response... 200 OK Length: 35650542 (34M) [application/x-gzip] Saving to: `kafka_2.11-0.9.0.1.tgz' 100%[=========================================================================================================================================>] 35,650,542 2.95M/s in 16s 2016-03-15 15:22:26 (2.10 MB/s) - `kafka_2.11-0.9.0.1.tgz' saved [35650542/35650542] [root@sandbox u01]# tar xfz kafka_2.11-0.9.0.1.tgz [root@sandbox u01]# ln -s kafka_2.11-0.9.0.1 kafka [root@sandbox u01]# cd kafka [root@sandbox kafka]# vi config/server.properties [root@sandbox kafka]# grep -v '^$\|^\s*\#' config/server.properties broker.id=0 listeners=PLAINTEXT://:9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/tmp/kafka-logs num.partitions=1 num.recovery.threads.per.data.dir=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 log.cleaner.enable=false zookeeper.connect=localhost:2181 zookeeper.connection.timeout.ms=6000 delete.topic.enable=true [root@sandbox kafka]# [root@sandbox kafka]# nohup bin/kafka-server-start.sh config/server.properties > /var/log/kafka/server.log & [1] 30669 [root@sandbox kafka]# nohup: ignoring input and redirecting stderr to stdout [/code] Now we need to prepare our two topics for the data received from the GoldenGate. As you remember we have defined topic "oggdata" for our data flow using parameter gg.handler.kafkahandler.TopicName in our kafka.props file and topic "mySchemaTopic" for schema changes. So, let's create the topic using Kafka's supplemented scripts: [code lang="text"] [root@sandbox kafka]# bin/kafka-topics.sh --zookeeper sandbox:2181 --create --topic oggtopic --partitions 1 --replication-factor 1 SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/u01/kafka_2.11-0.9.0.1/libs/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See https://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] Created topic "oggtopic". [root@sandbox kafka]# bin/kafka-topics.sh --zookeeper sandbox:2181 --list SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/u01/kafka_2.11-0.9.0.1/libs/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See https://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] oggtopic [root@sandbox kafka]# [/code] As matter of fact, all the necessary topics will also be created automatically when you start your GoldenGate replicat. You need to create the topic explicitly if you want to use some custom parameters for it. You also have the option to alter the topic later on when setting up configuration parameters. Here is list of the topics we have when one of them is created manually and the second one is created automatically by the replicat process. [code lang="bash"] [root@sandbox kafka]# bin/kafka-topics.sh --zookeeper sandbox:2181 --describe --topic oggtopic SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/u01/kafka_2.11-0.9.0.1/libs/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See https://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] Topic:oggtopic PartitionCount:1 ReplicationFactor:1 Configs: Topic: oggtopic Partition: 0 Leader: 0 Replicas: 0 Isr: 0 [root@sandbox kafka]# bin/kafka-topics.sh --zookeeper sandbox:2181 --describe --topic mySchemaTopic SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/u01/kafka_2.11-0.9.0.1/libs/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See https://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] Topic:mySchemaTopic PartitionCount:1 ReplicationFactor:1 Configs: Topic: mySchemaTopic Partition: 0 Leader: 0 Replicas: 0 Isr: 0 [root@sandbox kafka]# [/code] In our configuration we have only one server and the simplest configuration for Kafka. In a real business case it can be way more complex. Our replicat is going to post data changes to oggtopic, and all changes and definitions for schema to the mySchemaTopic. We've already mentioned that we are going to use Flume functionality to write to HDFS. I've prepared Flume with two sources and sinks to write data changes to the /user/oracle/ggflume HDFS directory. We had an option to split data and schema changes to different directories if we wish it. Here is my configuration for Flume: [code lang="text"] [root@sandbox ~]# cat /etc/flume-ng/conf/flume.conf # Name/aliases for the components on this agent agent.sources = ogg1 ogg2 agent.sinks = hdfs1 hdfs2 agent.channels = ch1 ch2 #Kafka source agent.sources.ogg1.type = org.apache.flume.source.kafka.KafkaSource agent.sources.ogg1.zookeeperConnect = localhost:2181 agent.sources.ogg1.topic = oggtopic agent.sources.ogg1.groupId = flume agent.sources.ogg1.kafka.consumer.timeout.ms = 100 agent.sources.ogg2.type = org.apache.flume.source.kafka.KafkaSource agent.sources.ogg2.zookeeperConnect = localhost:2181 agent.sources.ogg2.topic = mySchemaTopic agent.sources.ogg2.groupId = flume agent.sources.ogg2.kafka.consumer.timeout.ms = 100 # Describe the sink agent.sinks.hdfs1.type = hdfs agent.sinks.hdfs1.hdfs.path = hdfs://sandbox/user/oracle/ggflume agent.sinks.hdfs2.type = hdfs agent.sinks.hdfs2.hdfs.path = hdfs://sandbox/user/oracle/ggflume #agent.sinks.hdfs1.type = logger # Use a channel which buffers events in memory agent.channels.ch1.type = memory agent.channels.ch1.capacity = 1001 agent.channels.ch1.transactionCapacity = 1000 agent.channels.ch2.type = memory agent.channels.ch2.capacity = 1001 agent.channels.ch2.transactionCapacity = 1000 # Bind the source and sink to the channel agent.sources.ogg1.channels = ch1 agent.sources.ogg2.channels = ch2 agent.sinks.hdfs1.channel = ch1 agent.sinks.hdfs2.channel = ch2 [/code] As you can see, we have separate sources for each of our Kafka topics, and we have two sinks pointing to the same HDFS location. The data is going to be written down in Avro format. All preparations are completed, and we are running Kafka server, two topics, and Flume is ready to write data to HDFS. Our HDFS directory is still empty. [code lang="bash"] [oracle@sandbox oggbd]$ hadoop fs -ls /user/oracle/ggflume/ [oracle@sandbox oggbd]$ [/code] Let's run the passive replicat with our initial data load trail file : [code lang="bash"] [oracle@sandbox oggbd]$ cd /u01/oggbd [oracle@sandbox oggbd]$ ./replicat paramfile dirprm/irkafka.prm reportfile dirrpt/irkafka.rpt [oracle@sandbox oggbd]$ [/code] Now we can have a look to results. We got 3 files on HDFS where first two files describe structure for the TEST_TAB_1 and TEST_TAB_2 accordingly, and the third file contains the data changes, or maybe better to say initial data for those tables. You may see that the schema definition was put on separate files when the data changes were posted altogether to the one file. [code lang="text"] [oracle@sandbox ~]$ hadoop fs -ls /user/oracle/ggflume/ Found 3 items -rw-r--r-- 1 flume oracle 1833 2016-03-23 12:14 /user/oracle/ggflume/FlumeData.1458749691685 -rw-r--r-- 1 flume oracle 1473 2016-03-23 12:15 /user/oracle/ggflume/FlumeData.1458749691686 -rw-r--r-- 1 flume oracle 981 2016-03-23 12:15 /user/oracle/ggflume/FlumeData.1458749691718 [oracle@sandbox ~]$ [oracle@sandbox ~]$ hadoop fs -cat /user/oracle/ggflume/FlumeData.1458749691685 SEQ!org.apache.hadoop.io.LongWritable"org.apache.hadoop.io.BytesWritable?????k?\??????S?A?%?{ "type" : "record", "name" : "TEST_TAB_1", "namespace" : "BDTEST", "fields" : [ { "name" : "table", "type" : "string" ......................... [oracle@sandbox ~]$ hadoop fs -cat /user/oracle/ggflume/FlumeData.1458749691686 SEQ!org.apache.hadoop.io.LongWritable"org.apache.hadoop.io.BytesWritable?* ?e????xS?A?%N{ "type" : "record", "name" : "TEST_TAB_2", "namespace" : "BDTEST", "fields" : [ { "name" : "table", "type" : "string" }, { ............................... [oracle@sandbox ~]$hadoop fs -cat /user/oracle/ggflume/FlumeData.1458749691718 SEQ!org.apache.hadoop.io.LongWritable"org.apache.hadoop.io.BytesWritable??????c?C n??S?A?b"BDTEST.TEST_TAB_1I42016-02-16 19:17:40.74669942016-03-23T12:14:35.373000(00000000-10000002012 PK_ID1371O62FX&2014-01-24:19:09:20RJ68QYM5&2014-01-22:12:14:30"BDTEST.TEST_TAB_1I42016-02-16 19:17:40.74669942016-03-23T12:14:35.405000(00000000-10000002155 PK_ID2371O62FX&2014-01-24:19:09:20HW82LI73&2014-05-11:05:23:23"BDTEST.TEST_TAB_1I42016-02-16 19:17:40.74669942016-03-23T12:14:35.405001(00000000-10000002298 PK_ID3RXZT5VUN&2013-09-04:23:32:56RJ68QYM5&2014-01-22:12:14:30"BDTEST.TEST_TAB_1I42016-02-16 19:17:40.74669942016-03-23T12:14:35.405002(00000000-10000002441 PK_ID4RXZT5VUN&2013-09-04:23:32:56HW82LI73&2014-05-11:05:23:23"BDTEST.TEST_TAB_2I42016-02-16 19:17:40.76289942016-03-23T12:14:35.408000(00000000-10000002926 PK_IDRND_STR_1ACC_DATE7IJWQRO7T&2013-07-07:08:13:52[oracle@sandbox ~]$ [/code] Now we need to create our ongoing replication. Our extract was set up the same way as it was described in the first post of the series. It is up and running, passing changes to the replicat side to the directory ./dirdat [code lang="text"] GGSCI (sandbox.localdomain) 1> info all Program Status Group Lag at Chkpt Time Since Chkpt MANAGER RUNNING EXTRACT RUNNING GGEXT 00:00:09 00:00:03 [oracle@sandbox oggbd]$ ls -l dirdat/ total 240 -rw-r-----. 1 oracle oinstall 3028 Feb 16 14:17 initld -rw-r-----. 1 oracle oinstall 190395 Mar 14 13:00 or000041 -rw-r-----. 1 oracle oinstall 1794 Mar 15 12:02 or000042 -rw-r-----. 1 oracle oinstall 43222 Mar 17 11:53 or000043 [oracle@sandbox oggbd]$ [/code] I've prepared parameter file for the Kafka replicat : [code lang="text"] [oracle@sandbox oggbd]$ cat dirprm/rkafka.prm REPLICAT rkafka -- Trail file for this example is located in "AdapterExamples/trail" directory -- Command to add REPLICAT -- add replicat rkafka, exttrail dirdat/or, begin now TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props REPORTCOUNT EVERY 1 MINUTES, RATE GROUPTRANSOPS 10000 MAP GGTEST.*, TARGET BDTEST.*; [oracle@sandbox oggbd]$ [/code] We need only add and start our rkafka replica for the Big Data GoldenGate. [code lang="text"] GGSCI (sandbox.localdomain) 1> add replicat rkafka, exttrail dirdat/or, begin now REPLICAT added. GGSCI (sandbox.localdomain) 2> start replicat rkafka Sending START request to MANAGER ... REPLICAT RKAFKA starting GGSCI (sandbox.localdomain) 3> info rkafka REPLICAT RKAFKA Last Started 2016-03-24 11:53 Status RUNNING Checkpoint Lag 00:00:00 (updated 00:00:06 ago) Process ID 21041 Log Read Checkpoint File dirdat/or000000000 2016-03-24 11:53:17.388078 RBA 0 [/code] You may remember that we don't have dirdat/or000000000 file in our dirdat directory. So, our replicat has to be slightly corrected to work with proper trail files. I am altering sequence for my replicat to reflect actual sequence number for my last trail file. [code lang="text"] GGSCI (sandbox.localdomain) 10> stop replicat rkafka Sending STOP request to REPLICAT RKAFKA ... Request processed. GGSCI (sandbox.localdomain) 11> alter replicat rkafka EXTSEQNO 43 2016-03-24 12:03:27 INFO OGG-06594 Replicat RKAFKA has been altered through GGSCI. Even the start up position might be updated, duplicate suppression remains active in next startup. To override duplicate suppression, start RKAFKA with NOFILTERDUPTRANSACTIONS option. REPLICAT altered. GGSCI (sandbox.localdomain) 12> start replicat rkafka Sending START request to MANAGER ... REPLICAT RKAFKA starting GGSCI (sandbox.localdomain) 13> info rkafka REPLICAT RKAFKA Last Started 2016-03-24 12:03 Status RUNNING Checkpoint Lag 00:00:00 (updated 00:00:12 ago) Process ID 21412 Log Read Checkpoint File dirdat/or000000043 First Record RBA 0 GGSCI (sandbox.localdomain) 14> [/code] Let's change some data: [code lang="SQL"] orclbd> select * from test_tab_2; PK_ID RND_STR_1 ACC_DATE ---------------- ---------- --------------------------- 7 IJWQRO7T 07/07/13 08:13:52 orclbd> insert into test_tab_2 values (8,'TEST_INS1',sysdate); 1 row inserted. orclbd> commit; Commit complete. orclbd> [/code] [code lang="text"] [oracle@sandbox oggbd]$ hadoop fs -ls /user/oracle/ggflume/ Found 5 items -rw-r--r-- 1 flume oracle 1833 2016-03-23 12:14 /user/oracle/ggflume/FlumeData.1458749691685 -rw-r--r-- 1 flume oracle 1473 2016-03-23 12:15 /user/oracle/ggflume/FlumeData.1458749691686 -rw-r--r-- 1 flume oracle 981 2016-03-23 12:15 /user/oracle/ggflume/FlumeData.1458749691718 -rw-r--r-- 1 flume oracle 278 2016-03-24 12:18 /user/oracle/ggflume/FlumeData.1458836268086 -rw-r--r-- 1 flume oracle 1473 2016-03-24 12:18 /user/oracle/ggflume/FlumeData.1458836268130 [oracle@sandbox oggbd]$ [oracle@sandbox oggbd]$ hadoop fs -cat /user/oracle/ggflume/FlumeData.1458836268086 SEQ!org.apache.hadoop.io.LongWritable"org.apache.hadoop.io.BytesWritable?Q???n?y?1?R#S?j???"BDTEST.TEST_TAB_2I42016-03-24 16:17:29.00033642016-03-24T12:17:31.733000(00000000430000043889 PK_IDRND_STR_1ACC_DATE8TEST_INS1&2016-03-24:12:17:26[oracle@sandbox oggbd]$ [oracle@sandbox oggbd]$ hadoop fs -cat /user/oracle/ggflume/FlumeData.1458836268130 SEQ!org.apache.hadoop.io.LongWritable"org.apache.hadoop.io.BytesWritable?6F!?Z?-?ZA8r^S?j?oN{ "type" : "record", "name" : "TEST_TAB_2", "namespace" : "BDTEST", [/code] We got our schema definition file and a file with data changes. [code lang="SQL"] orclbd> update test_tab_2 set RND_STR_1='TEST_UPD1' where pk_id=8; 1 row updated. orclbd> commit; Commit complete. orclbd> [/code] [code lang="text"] [oracle@sandbox oggbd]$ hadoop fs -ls /user/oracle/ggflume/ Found 6 items -rw-r--r-- 1 flume oracle 1833 2016-03-23 12:14 /user/oracle/ggflume/FlumeData.1458749691685 -rw-r--r-- 1 flume oracle 1473 2016-03-23 12:15 /user/oracle/ggflume/FlumeData.1458749691686 -rw-r--r-- 1 flume oracle 981 2016-03-23 12:15 /user/oracle/ggflume/FlumeData.1458749691718 -rw-r--r-- 1 flume oracle 278 2016-03-24 12:18 /user/oracle/ggflume/FlumeData.1458836268086 -rw-r--r-- 1 flume oracle 1473 2016-03-24 12:18 /user/oracle/ggflume/FlumeData.1458836268130 -rw-r--r-- 1 flume oracle 316 2016-03-24 12:28 /user/oracle/ggflume/FlumeData.1458836877420 [oracle@sandbox oggbd]$ hadoop fs -cat /user/oracle/ggflume/FlumeData.1458836877420 SEQ!org.apache.hadoop.io.LongWritable"org.apache.hadoop.io.BytesWritable]??u????????qS?t,??"BDTEST.TEST_TAB_2U42016-03-24 16:27:39.00035642016-03-24T12:27:42.177000(00000000430000044052 PK_IDRND_STR_1ACC_DATE8TEST_INS1&2016-03-24:12:17:268TEST_UPD1&2016-03-24:12:17:26[oracle@sandbox oggbd]$ [/code] You can see that we only got a file with data changes since no DDL changes were made. The transactions will be grouped to the files according to our Flume parameters as we discussed in the previous blog post. You can also see old value for the updated record and the new one. Using that information we can reconstruct the changes, but we need to apply certain logic to decrypt the changes. For deletion operation we are getting operation flag "F" and values for the deleted record. Again, no schema definition file since no changes were made. Let's try some DDL. [code lang="SQL"] orclbd> truncate table test_tab_2; Table TEST_TAB_2 truncated. orclbd> [/code] [code lang="text"] GGSCI (sandbox.localdomain) 4> info rkafka REPLICAT RKAFKA Last Started 2016-03-24 12:10 Status RUNNING Checkpoint Lag 00:00:00 (updated 00:00:02 ago) Process ID 21803 Log Read Checkpoint File dirdat/or000043 2016-03-24 12:40:05.000303 RBA 45760 GGSCI (sandbox.localdomain) 5> [/code] No new files on HDFS. [code lang="SQL"] orclbd> insert into test_tab_2 select * from test_tab_3; 1 row inserted. orclbd> commit; Commit complete. orclbd> [/code] [code lang="text"] [oracle@sandbox oggbd]$ hadoop fs -ls /user/oracle/ggflume/ Found 8 items -rw-r--r-- 1 flume oracle 1833 2016-03-23 12:14 /user/oracle/ggflume/FlumeData.1458749691685 -rw-r--r-- 1 flume oracle 1473 2016-03-23 12:15 /user/oracle/ggflume/FlumeData.1458749691686 -rw-r--r-- 1 flume oracle 981 2016-03-23 12:15 /user/oracle/ggflume/FlumeData.1458749691718 -rw-r--r-- 1 flume oracle 278 2016-03-24 12:18 /user/oracle/ggflume/FlumeData.1458836268086 -rw-r--r-- 1 flume oracle 1473 2016-03-24 12:18 /user/oracle/ggflume/FlumeData.1458836268130 -rw-r--r-- 1 flume oracle 316 2016-03-24 12:28 /user/oracle/ggflume/FlumeData.1458836877420 -rw-r--r-- 1 flume oracle 278 2016-03-24 12:35 /user/oracle/ggflume/FlumeData.1458837310570 -rw-r--r-- 1 flume oracle 277 2016-03-24 12:42 /user/oracle/ggflume/FlumeData.1458837743709 [oracle@sandbox oggbd]$ hadoop fs -cat /user/oracle/ggflume/FlumeData.1458837743709 SEQ!org.apache.hadoop.io.LongWritable"org.apache.hadoop.io.BytesWritable*?2??????>iS??\??"BDTEST.TEST_TAB_2I42016-03-24 16:42:04.00020042016-03-24T12:42:06.774000(00000000430000045760 PK_IDRND_STR_1ACC_DATE7IJWQRO7T&2013-07-07:08:13:52[oracle@sandbox oggbd]$ [/code] Again, we got only file with data changes. I tried to compare the file we were getting for the previous insert and insert after truncate, but couldn't find difference except for the binary part of the avro file. It will require additional investigation and maybe clarification from Oracle. In the current state it looks like it is easy to miss a truncate command for a table on the destination side. Let us change the table and add a column there. [code lang="SQL"] orclbd> alter table test_tab_2 add test_col varchar2(10); Table TEST_TAB_2 altered. orclbd> [/code] We are not getting any new files with new table definitions until we do any DML on the table. Both files (with the new schema definition and data changes) will appear after we insert, delete or update any rows there. [code lang="SQL"] orclbd> insert into test_tab_2 values (8,'TEST_INS1',sysdate,'TEST_ALTER'); 1 row inserted. orclbd> commit; Commit complete. orclbd> [/code] [code lang="text"] [oracle@sandbox oggbd]$ hadoop fs -ls /user/oracle/ggflume/ Found 10 items ................................................... -rw-r--r-- 1 flume oracle 1654 2016-03-24 12:56 /user/oracle/ggflume/FlumeData.1458838582020 -rw-r--r-- 1 flume oracle 300 2016-03-24 12:56 /user/oracle/ggflume/FlumeData.1458838584891 [oracle@sandbox oggbd]$ hadoop fs -cat /user/oracle/ggflume/FlumeData.1458838582020 SEQ!org.apache.hadoop.io.LongWritable"org.apache.hadoop.io.BytesWritable-??ip??/?w?S??/{ "type" : "record", "name" : "TEST_TAB_2", "namespace" : "BDTEST", ................ "name" : "TEST_COL", "type" : [ "null", "string" ], "default" : null ................. [oracle@sandbox oggbd]$ hadoop fs -cat /user/oracle/ggflume/FlumeData.1458838584891 SEQ!org.apache.hadoop.io.LongWritable"org.apache.hadoop.io.BytesWritabletr?V?_$???:2??S??/w?"BDTEST.TEST_TAB_2I42016-03-24 16:56:04.00026042016-03-24T12:56:08.370000(00000000430000047682 PK_IDRND_STR_1ACC_DATETEST_COL8TEST_INS1&2016-03-24:12:56:01TEST_ALTER [/code] I used JMeter to generate some load, and it could easily with almost no delays, replicate 225 transactions per second (30% inserts 80% updates). It was not a test for Kafka or Flume, which could sustain way more load, but rather combination of GoldenGate with the Big Data infrastructure. It was stable without any errors. I do understand that the current test is very far from any potential production workflow which may include Oracle Database (or any other RDBMS) + GoldenGate + Kafka + Storm + .... . And maybe the final data format will be completely different. So far the adapters are looking good and doing the job. In the next post I will observe the HBASE adapter. Stay tuned.

No Comments Yet

Let us know what you think

Subscribe by email