Pythian Blog: Technical Track

GoldenGate 12.2 big data adapters: part 2 - FLUME

In this blog post we continue our review of the new Oracle GoldenGate Big Data adapters. In the first part of the series I tested the basic HDFS adapter and checked how it worked with some DML and DDL. In this article I will try the Flume adapter and see how it works. A quick reminder on what Flume is. It is not a topic about the popular Australian musician. Today we are talking about Apache Flume. In short, it is a pipeline or a kind of streaming system that allows you to move large amount of data. It has simple architecture and, in general, there are three main components: a) Source: where data enters into Flume from an outside system. b) Sink: responsible for passing data to the destination system whether it is the final destination, or another flow. c) Channel: connects the Source and Sink. I know that it is a rather simplistic description, but the main subject of this article is not what Flume can do, but how we can pass our data from Oracle to Flume using GoldenGate. My first post discussed how you set up an Oracle source system, and how to start GoldenGate initial load and extract. I am not repeating it here. Let's assume we have the source system. It is Oracle database, replicating DML and DDL for one particular schema, and GGTEST using Oracle GoldenGate 12.2 to trail files on our box where we already have GoldenGate for Big Data. Have a look at the first part to see how to set up the GoldenGate for Big Data (OGG BD). So, we have our OGG BD setup and the manager up and running. [code lang="text"] GGSCI (sandbox.localdomain) 1> info manager Manager is running (IP port sandbox.localdomain.7839, Process ID 18521). GGSCI (sandbox.localdomain) 2> [/code] What we need now is to prepare our Flume agent to accept messages from OGG. I've already set up my Flume's agent-ng service on my Linux box, and now we need to prepare the configuration file for the agent to handle the income stream, and pass it to the destination system. We will set up our source to "avro" and sink will be writing to HDFS. The source can be either avro or thrift. According to oracle documentation the Flume handler can stream data from a trail file to Avro or Thrift RPC Flume sources. I have to admit that the destination as HDFS looks quite artificial since we have a special adapter for HDFS and don't need a Flume to write there. But such a configuration can help us compare different adapters and what they can do. I used Flume version 1.6.0: [code lang="text"] [oracle@sandbox flume-ng]$ bin/flume-ng version Flume 1.6.0 Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git Revision: 2561a23240a71ba20bf288c7c2cda88f443c2080 Compiled by hshreedharan on Mon May 11 11:15:44 PDT 2015 From source with checksum b29e416802ce9ece3269d34233baf43f [oracle@sandbox flume-ng]$ [/code] Here is my configuration file for the Flume agent: [code lang="text"] # Name/aliases for the components on this agent agent.sources = ogg1 agent.sinks = hdfs1 agent.channels = ch1 #Avro source agent.sources.ogg1.type = avro agent.sources.ogg1.bind = 0.0.0.0 agent.sources.ogg1.port = 4141 # Describe the sink agent.sinks.hdfs1.type = hdfs agent.sinks.hdfs1.hdfs.path = hdfs://sandbox/user/oracle/ggflume #agent.sinks.hdfs1.type = logger # Use a channel which buffers events in memory agent.channels.ch1.type = memory agent.channels.ch1.capacity = 100000 agent.channels.ch1.transactionCapacity = 10000 # Bind the source and sink to the channel agent.sources.ogg1.channels = ch1 agent.sinks.hdfs1.channel = ch1 [/code] I've made the configuration simple and clear. You may change agent.sources.ogg1.port and agent.sinks.hdfs1.hdfs.path depending on your system. On the target HDFS we have to create directory as it was defined in our sink configuration. [code lang="bash"] [oracle@sandbox ~]$ hadoop fs -mkdir /user/oracle/ggflume [oracle@sandbox ~]$ hadoop fs -ls /user/oracle/ggflume [oracle@sandbox ~]$ [/code] We can start our Flume agent now. [code lang="bash"] [root@sandbox conf]# service flume-ng-agent start Starting Flume NG agent daemon (flume-ng-agent): [ OK ] [root@sandbox conf]# service flume-ng-agent status Flume NG agent is running [ OK ] [root@sandbox conf]# [root@sandbox conf]# tail /var/log/flume-ng/flume.log 25 Feb 2016 11:56:37,113 INFO [lifecycleSupervisor-1-0] (org.apache.flume.instrumentation.MonitoredCounterGroup.register:120) - Monitored counter group for type: CHANNEL, name: ch1: Successfully registered new MBean. 25 Feb 2016 11:56:37,121 INFO [lifecycleSupervisor-1-0] (org.apache.flume.instrumentation.MonitoredCounterGroup.start:96) - Component type: CHANNEL, name: ch1 started 25 Feb 2016 11:56:37,122 INFO [conf-file-poller-0] (org.apache.flume.node.Application.startAllComponents:173) - Starting Sink hdfs1 25 Feb 2016 11:56:37,123 INFO [conf-file-poller-0] (org.apache.flume.node.Application.startAllComponents:184) - Starting Source ogg1 25 Feb 2016 11:56:37,139 INFO [lifecycleSupervisor-1-0] (org.apache.flume.source.AvroSource.start:228) - Starting Avro source ogg1: { bindAddress: 0.0.0.0, port: 4141 }... 25 Feb 2016 11:56:37,146 INFO [lifecycleSupervisor-1-2] (org.apache.flume.instrumentation.MonitoredCounterGroup.register:120) - Monitored counter group for type: SINK, name: hdfs1: Successfully registered new MBean. 25 Feb 2016 11:56:37,147 INFO [lifecycleSupervisor-1-2] (org.apache.flume.instrumentation.MonitoredCounterGroup.start:96) - Component type: SINK, name: hdfs1 started 25 Feb 2016 11:56:38,114 INFO [lifecycleSupervisor-1-0] (org.apache.flume.instrumentation.MonitoredCounterGroup.register:120) - Monitored counter group for type: SOURCE, name: ogg1: Successfully registered new MBean. 25 Feb 2016 11:56:38,115 INFO [lifecycleSupervisor-1-0] (org.apache.flume.instrumentation.MonitoredCounterGroup.start:96) - Component type: SOURCE, name: ogg1 started 25 Feb 2016 11:56:38,116 INFO [lifecycleSupervisor-1-0] (org.apache.flume.source.AvroSource.start:253) - Avro source ogg1 started. [root@sandbox conf]# [/code] Flume is ready, and we can now prepare our OGG configuration. We have examples for the Flume adapter configuration files in $OGG_HOME/AdapterExamples/big-data/flume/ : [code lang="text"] [oracle@sandbox oggbd]$ ll AdapterExamples/big-data/flume/ total 12 -rw-r--r--. 1 oracle oinstall 107 Dec 9 12:56 custom-flume-rpc.properties -r-xr-xr-x. 1 oracle oinstall 812 Dec 9 12:56 flume.props -rw-r--r--. 1 oracle oinstall 332 Dec 9 12:56 rflume.prm [oracle@sandbox oggbd]$ [/code] We can copy the examples to our configuration directory and adjust them to our needs: [code lang="text"] [oracle@sandbox oggbd]$ cp AdapterExamples/big-data/flume/* dirprm/ Here is configuration file for our adapter: [oracle@sandbox oggbd]$ cat dirprm/flume.props gg.handlerlist = flumehandler gg.handler.flumehandler.type=flume gg.handler.flumehandler.RpcClientPropertiesFile=custom-flume-rpc.properties gg.handler.flumehandler.format=avro_op gg.handler.flumehandler.mode=tx #gg.handler.flumehandler.maxGroupSize=100, 1Mb #gg.handler.flumehandler.minGroupSize=50, 500 Kb gg.handler.flumehandler.EventMapsTo=tx gg.handler.flumehandler.PropagateSchema=true gg.handler.flumehandler.includeTokens=false gg.handler.flumehandler.format.WrapMessageInGenericAvroMessage=true goldengate.userexit.timestamp=utc goldengate.userexit.writers=javawriter javawriter.stats.display=TRUE javawriter.stats.full=TRUE gg.log=log4j gg.log.level=INFO gg.report.time=30sec gg.classpath=dirprm/:/usr/lib/flume-ng/lib/*: javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar [/code] You will need to adjust your gg.classpath depending on your system, as it has to include Flume java classes and a file with Flume source properties (custom-flume-rpc.properties file). Here is my example for the custom-flume-rpc.properties file which will be used by OGG adapter to connect to the flume-ng agent. I've placed it to the dirprm directory along with other parameters. [code lang="text"] [oracle@sandbox oggbd]$ cat dirprm/custom-flume-rpc.properties client.type=default hosts=h1 hosts.h1=localhost:4141 batch-size=100 connect-timeout=20000 request-timeout=20000</pre> <pre>[/code] As you can see, my flume-ng agent is on the same host as the OGG which may not be the same for you. In your case you may need to provide hostname and port for your running glume-ng agent. We need to prepare the configuration file for our initial load. The OGG trail file is located in the dirdat/ directory and has name initld. [code lang="text"]</pre> <pre></pre> <pre>Here is our parameter file for initial load using passive replicat: [oracle@sandbox oggbd]$ cat dirprm/irflume.prm --initial REPLICAT irflume -- Command to run REPLICAT in passive mode -- ./replicat paramfile dirprm/irflume.prm reportfile dirrpt/irflume.rpt SPECIALRUN END RUNTIME EXTFILE /u01/oggbd/dirdat/initld --DDLERROR default discard DDL include all TARGETDB LIBFILE libggjava.so SET property=dirprm/flume.props REPORTCOUNT EVERY 1 MINUTES, RATE GROUPTRANSOPS 10000 MAP ggtest.*, TARGET bdtest.*; [/code] Let's run the load and see what we get in the end: [code lang="text"] [oracle@sandbox oggbd]$ ./replicat paramfile dirprm/irflume.prm reportfile dirrpt/irflume.rpt [/code] The command completed successfully and we got three new files on HDFS. The first 2 files had the schema description and the 3-d one had the data for the replicated tables. [code lang="text"] [root@sandbox ~]# hadoop fs -ls /user/oracle/ggflume Found 12 items -rw-r--r-- 1 flume oracle 1833 2016-03-10 11:17 /user/oracle/ggflume/FlumeData.1457626634620 -rw-r--r-- 1 flume oracle 1762 2016-03-10 11:17 /user/oracle/ggflume/FlumeData.1457626634621 -rw-r--r-- 1 flume oracle 1106 2016-03-10 11:17 /user/oracle/ggflume/FlumeData.1457626634622 [root@sandbox ~]# hadoop fs -tail /user/oracle/ggflume/FlumeData.1457626634620 { "name" : "PK_ID", "type" : [ "null", "string" ], "default" : null }, { "name" : "PK_ID_isMissing", "type" : "boolean" }, { "name" : "RND_STR", "type" : [ "null", "string" ], "default" : null }, { "name" : "RND_STR_isMissing", "type" : "boolean" .................. [root@sandbox ~]# hadoop fs -tail /user/oracle/ggflume/FlumeData.1457626634621 "string" }, { "name" : "primary_keys", "type" : { "type" : "array", "items" : "string" } }, { "name" : "tokens", "type" : { "type" : "map", "values" : "string" }, ........................... [root@sandbox ~]# hadoop fs -tail /user/oracle/ggflume/FlumeData.1457626634622 :?v??8????? SaQm?"BDTEST.TEST_TAB_1Ñ?? ?"BDTEST.TEST_TAB_1I42016-02-16 19:17:40.74669942016-03-10T11:17:14.448000(00000000-10000002012 PK_ID1371O62FX&2014-01-24:19:09:20RJ68QYM5&2014-01-22:12:14:30"BDTEST.TEST_TAB_1Ñ?? ?"BDTEST.TEST_TAB_1I42016-02-16 19:17:40.74669942016-03-10T11:17:14.459000(00000000-10000002155 PK_ID2371O62FX&2014-01-24:19:09:20HW82LI73&2014-05-11:05:23:23"BDTEST.TEST_TAB_1Ñ?? ?"BDTEST.TEST_TAB_1I42016-02-16 19:17:40.74669942016-03-10T11:17:14.459001(00000000-10000002298 PK_ID3RXZT5VUN&2013-09-04:23:32:56RJ68QYM5&2014-01-22:12:14:30"BDTEST.TEST_TAB_1Ñ?? ?"BDTEST.TEST_TAB_1I42016-02-16 19:17:40.74669942016-03-10T11:17:14.460000(00000000-10000002441 PK_ID4RXZT5VUN&2013-09-04:23:32:56HW82LI73&2014-05-11:05:23:23"BDTEST.TEST_TAB_2?????"BDTEST.TEST_TAB_2I42016-02-16 19:17:40.76289942016-03-10T11:17:14.466000(00000000-10000002926 PK_IDRND_STR_1ACC_DATE7IJWQRO7T&2013-07-07:08:13:52 [/code] The initial load has succeeded, and now we can create and start the proper ongoing replication to HDFS through Flume. Let's prepare a new parameter file for our permanent Flume replicat and starting it up. [code lang="text"] GGSCI (sandbox.localdomain) 2> edit param rflume REPLICAT rflume -- Trail file for this example is located in "dirdat/" directory -- Command to add REPLICAT -- add replicat rflume, exttrail dirdat/or TARGETDB LIBFILE libggjava.so SET property=dirprm/flume.props REPORTCOUNT EVERY 1 MINUTES, RATE GROUPTRANSOPS 10000 HANDLECOLLISIONS MAP ggtest.*, TARGET bdtest.*; GGSCI (sandbox.localdomain) 1> add replicat rflume, exttrail dirdat/or, begin now REPLICAT added. GGSCI (sandbox.localdomain) 2> start replicat rflume Sending START request to MANAGER ... REPLICAT RFLUME starting [/code] Let's insert a row and see what we get on the target system. [code lang="sql"] orclbd> insert into ggtest.test_tab_1 2 values (7,dbms_random.string('x', 8), sysdate-(7+dbms_random.value(0,1000)), 3 dbms_random.string('x', 8), sysdate-(6+dbms_random.value(0,1000))) ; 1 row inserted. orclbd> commit; Commit complete. orclbd> [/code] As soon as commit had been executed we received a couple of new files on HDFS where the first had the schema for the changed table, and the second had the data for the transaction or "payload". [code lang="text"] [root@sandbox ~]# hadoop fs -ls /user/oracle/ggflume ................. -rw-r--r-- 1 flume oracle 1833 2016-03-10 11:17 /user/oracle/ggflume/FlumeData.1457626634620 -rw-r--r-- 1 flume oracle 1762 2016-03-10 11:17 /user/oracle/ggflume/FlumeData.1457626634621 -rw-r--r-- 1 flume oracle 1106 2016-03-10 11:17 /user/oracle/ggflume/FlumeData.1457626634622 -rw-r--r-- 1 flume oracle 1833 2016-03-10 12:43 /user/oracle/ggflume/FlumeData.1457631817021 -rw-r--r-- 1 flume oracle 605 2016-03-10 12:43 /user/oracle/ggflume/FlumeData.1457631817022 [root@sandbox ~]# [root@sandbox ~]# hadoop fs -cat /user/oracle/ggflume/FlumeData.1457631817021 SEQ!org.apache.hadoop.io.LongWritable"org.apache.hadoop.io.BytesWritable???:]B?9?k?? ]kTSa?m??{ "type" : "record", "name" : "TEST_TAB_1", "namespace" : "BDTEST", "fields" : [ { "name" : "table", "type" : "string" }, { "name" : "op_type", "type" : "string" }, { .............................. [root@sandbox ~]# hadoop fs -cat /user/oracle/ggflume/FlumeData.1457631817022 {EQ!org.apache.hadoop.io.LongWritable"org.apache.hadoop.io.BytesWritable~,?`?aHTZRB?Sa?ny "type" : "record", "name" : "generic_wrapper", "namespace" : "oracle.goldengate", "fields" : [ { "name" : "table_name", "type" : "string" }, { "name" : "schema_hash", "type" : "int" }, { "name" : "payload", "type" : "bytes" } ] }Sa?nz?"BDTEST.TEST_TAB_1Ñ?? ?"BDTEST.TEST_TAB_1I42016-03-10 17:43:31.00169042016-03-10T12:43:33.464000(00000000080001408270 PK_ID7XYJN3Z31&2014-04-21:09:01:21FL6Z8RPN&2013-08-06:21:40:02 [/code] I prepared and executed a small regression testing of inserts and updates to the table using jmeter, and started to push inserts and updates with a rate about 29 transaction per second.Even with one flume channel and my small Hadoop environment, it had a pretty good response time without trashing any errors. Flume put about 900 transactions per a HDFS file. [code lang="text"] -rw-r--r-- 1 flume oracle 123919 2016-03-10 14:52 /user/oracle/ggflume/FlumeData.1457639485465 -rw-r--r-- 1 flume oracle 35068 2016-03-10 14:52 /user/oracle/ggflume/FlumeData.1457639485466 -rw-r--r-- 1 flume oracle 145639 2016-03-10 14:52 /user/oracle/ggflume/FlumeData.1457639485467 -rw-r--r-- 1 flume oracle 178943 2016-03-10 14:52 /user/oracle/ggflume/FlumeData.1457639485468 -rw-r--r-- 1 flume oracle 103285 2016-03-10 14:52 /user/oracle/ggflume/FlumeData.1457639485469 [oracle@sandbox Downloads]$ hadoop fs -cat /user/oracle/ggflume/FlumeData.1457639485467 | wc -l 804 [oracle@sandbox Downloads]$ hadoop fs -cat /user/oracle/ggflume/FlumeData.1457639485468 | wc -l 988 [oracle@sandbox Downloads]$ hadoop fs -cat /user/oracle/ggflume/FlumeData.1457639485469 | wc -l 570 [oracle@sandbox Downloads]$ [/code] I've also tried the "thrift" datasource for Flume and it worked well too. To switch from "avro" to "thrift" I changed the value in the parameter agent.sources.ogg1.type in the flume.conf and restarted the flume agent. You also have to change client.type from default to thrift in your custom-flume-rpc.properties file. It worked fine, and I was able to get the information from the trail and write to the hdfs. [code lang="text"] [oracle@sandbox oggbd]$ ./replicat paramfile dirprm/irflume.prm reportfile dirrpt/irflume.rpt [oracle@sandbox oggbd]$ hadoop fs -ls /user/oracle/ggflume Found 3 items -rw-r--r-- 1 flume oracle 1833 2016-02-25 16:05 /user/oracle/ggflume/FlumeData.1456434311892 -rw-r--r-- 1 flume oracle 1762 2016-02-25 16:05 /user/oracle/ggflume/FlumeData.1456434311893 -rw-r--r-- 1 flume oracle 1106 2016-02-25 16:05 /user/oracle/ggflume/FlumeData.1456434311894 [oracle@sandbox oggbd]$ [oracle@sandbox oggbd]$ hadoop fs -cat /user/oracle/ggflume/FlumeData.1456434311892 SEQ!org.apache.hadoop.io.LongWritable"org.apache.hadoop.io.BytesWritableOG?????$?{qS@]?{ "type" : "record", "name" : "TEST_TAB_1", "namespace" : "BDTEST", "fields" : [ { "name" : "table", "type" : "string" }, { ..... [oracle@sandbox oggbd]$ hadoop fs -cat /user/oracle/ggflume/FlumeData.1456434311894 SEQ!org.apache.hadoop.io.LongWritable"org.apache.hadoop.io.BytesWritable?????t?E?9j??S@??"BDTEST.TEST_TAB_1Ñ?? ?"BDTEST.TEST_TAB_1I42016-02-16 19:17:40.74669942016-02-25T16:05:11.480000(00000000-10000002012 PK_ID1371O62FX&2014-01-24:19:09:20RJ68QYM5&2014-01-22:12:14:30"BDTEST.TEST_TAB_1Ñ?? ?"BDTEST.TEST_TAB_1I42016-02-16 19:17:40.74669942016-02-25T16:05:11.498000(00000000-10000002155 PK_ID2371O62FX&2014-01-24:19:09:20HW82LI73&2014-05-11:05:23:23"BDTEST.TEST_TAB_1Ñ?? ?"BDTEST.TEST_TAB_1I42016-02-16 19:17:40.74669942016-02-25T16:05:11.498001(00000000-10000002298 PK_ID3RXZT5VUN&2013-09-04:23:32:56RJ68QYM5&2014-01-22:12:14:30"BDTEST.TEST_TAB_1Ñ?? ?"BDTEST.TEST_TAB_1I42016-02-16 19:17:40.74669942016-02-25T16:05:11.499000(00000000-10000002441 PK_ID4RXZT5VUN&2013-09-04:23:32:56HW82LI73&2014-05-11:05:23:23"BDTEST.TEST_TAB_2?????"BDTEST.TEST_TAB_2I42016-02-16 19:17:40.76289942016-02-25T16:05:11.505000(00000000-10000002926 PK_IDRND_STR_1ACC_DATE7IJWQRO7T&2013-07-07:08:13:52[oracle@sandbox oggbd]$ [/code] You can see from the output that in the FlumeData.1456434311894 file we are getting the schema description and in the FlumeData.1456434311894 we have the data from the tables TEST_TAB_1 and TEST_TAB_2. Let's try some simple DDL commands. If we truncate a table: [code lang="sql"] orclbd> truncate table ggtest.test_tab_1; Table GGTEST.TEST_TAB_1 truncated. orclbd> [/code] It is not going to be replicated. If we are altering the table, we are not seeing it as a separate command, but it is going to be reflected in the new schema definition for any new transaction replicated to HDFS. You will get a file with new schema definition and the transaction itself in a next file. [code lang="sql"] orclbd> alter table ggtest.test_tab_1 add (new1 varchar2(10)); Table GGTEST.TEST_TAB_1 altered. orcl> insert into ggtest.test_tab_1 2 values (7,dbms_random.string('x', 8), sysdate-(7+dbms_random.value(0,1000)), 3 dbms_random.string('x', 8), sysdate-(6+dbms_random.value(0,1000)),'new_col' ); 1 row created. orcl> commit; Commit complete. orcl> [/code] [code lang="text"] [oracle@sandbox oggbd]$ hadoop fs -cat /user/oracle/ggflume/FlumeData.1457117136700 SEQ!org.apache.hadoop.io.LongWritable"org.apache.hadoop.io.BytesWritable????)0???4(SB?Yc{ "type" : "record", "name" : "TEST_TAB_1", "namespace" : "BDTEST", "fields" : [ { "name" : "table", "type" : "string" ........ ........ "name" : "NEW1", "type" : [ "null", "string" ], "default" : null }, { "name" : "NEW1_isMissing", "type" : "boolean" ........ [oracle@sandbox oggbd]$ hadoop fs -cat /user/oracle/ggflume/FlumeData.1457117136701 ........ }SB???"BDTEST.TEST_TAB_1?????"BDTEST.TEST_TAB_1I42016-03-04 18:45:30.00131442016-03-04T13:45:34.156000(00000000000000014363 PK_ID7U09D0CTU&2013-08-15:12:53:50W0BSUWLL&2013-08-16:09:28:12new_col [/code] As I've mentioned in my previous post, the deeper investigation of supported DDL changes is going to be the subject of a dedicated blog post. Here we can conclude that the adapter worked as expected, and supported the flow of transactions from our Oracle database down to Flume using Avro and Thrift sources. Of course it is not production implementation, and serves only as a basic functional and elementary regression testing. For a serious production workflow we need to develop appropriate architecture. In my next few posts I plan to check Kafka and HBASE adapters and see how they work. Stay tuned!

No Comments Yet

Let us know what you think

Subscribe by email