Pythian Blog: Technical Track

Apache beam pipelines with Scala: part 1 - template

In this 3-part series I'll show you how to build and run Apache Beam pipelines using Java API in Scala. In the first part we will develop the simplest streaming pipeline that reads jsons from Google Cloud Pub/Sub, convert them into TableRow objects and insert them into Google Cloud BigQuery table. Then we will run our pipeline with sbt on local runner and then deploy it on Google Cloud Platform (GCP). Prerequisites You need to have GCP project created with enabled API for DataFlow, Pub/Sub and BigQuery. Perform gcloud auth login or create service account with proper permissions and download key.json locally, install JDK and sbt. Also you have to create Pub/Sub topic and subscription, target BQ table and temp location in GS. Building pipeline First of all you need to create an instance of options. You may either use fromArgs method or set parameters manually. [code language="scala"] trait TestOptions extends PipelineOptions with DataflowPipelineOptions val options = PipelineOptionsFactory.create().as(classOf[TestOptions])[/code] The next thing to do is to define input subscription name and output table reference object for pipeline I/O. [code language="scala"] val fullSubscriptionName = s"projects/$projectId/subscriptions/$subscription" val targetTable = new TableReference() .setProjectId(projectId) .setDatasetId(dataset) .setTableId(tableName)[/code] Now we can describe our DoFn function. It processes json string messages trying to convert them into TableRow: [code language="scala"] class MyDoFn extends DoFn[String, TableRow] with LazyLogging { @ProcessElement def processElement(c: ProcessContext) { val inputString = c.element() logger.info(s"Received message: $inputString") Try { Transport.getJsonFactory.fromString(inputString, classOf[TableRow]) } match { case Success(row) ? logger.info(s"Converted to TableRow: $row") c.output(row) case Failure(ex) ? logger.info(s"Unable to parse message: $inputString", ex) } } }[/code] The last thing to do is to combine all parts together using pipeline object: [code language="scala"] val p = Pipeline.create(options) p.apply("read-pubsub", PubsubIO .readStrings() .fromSubscription(fullSubscriptionName)) .apply("process", ParDo.of(new MyDoFn)) .apply("write-bq", BigQueryIO .writeTableRows() .to(targetTable) .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER))[/code] And we are ready to run it: [code language="scala"] p.run()[/code] Run pipeline locally To start your pipeline locally you need to specify DirectRunner in pipeline options. Then you can simply start your pipeline with sbt run command: [code language="scala"]$ sbt run [info] Loading settings from plugins.sbt ... [info] Loading project definition from C:\Users\Valentin\workspace\beam-template\project [info] Loading settings from build.sbt ... [info] Set current project to beam-template (in build file:/C:/Users/Valentin/workspace/beam-template/) [info] Running com.pythian.Beam[/code] Then you can publish the message from cloud console into you topic to test it: [code language="scala"]{ "id": 1, "data": "test data" }[/code] In a while you should see something like: [code language="scala"]2017/12/11 01:54:16.581 INFO com.pythian.Beam$MyDoFn - Received message: { "id": 1, "data": "test data" } 2017/12/11 01:54:16.588 INFO com.pythian.Beam$MyDoFn - Converted to TableRow: {"id":1,"data":"test data"}[/code] You can now select your row from BigQuery (please note that table preview won't show the rows which are in streaming buffer yet): [code language="scala"]select * from test_nikotin.test where id = 1[/code] Run pipeline in DataFlow Once you are done with your tests you are ready to start it on GCP. Configure runner to DataflowRunner and run sbt: [code language="scala"]$ sbt run [info] Loading settings from plugins.sbt ... ... [info] Running com.pythian.Beam ... 2017/12/11 01:50:04.937 INFO o.a.b.r.dataflow.util.PackageUtil - Uploading 112 files from PipelineOptions.filesToStage to staging location to prepare for execution. 2017/12/11 01:50:09.093 INFO o.a.b.r.dataflow.util.PackageUtil - Staging files complete: 111 files cached, 1 files newly uploaded 2017/12/11 01:50:09.196 INFO o.a.b.r.d.DataflowPipelineTranslator - Adding read-pubsub/PubsubUnboundedSource as step s1 ... Dataflow SDK version: 2.1.0 2017/12/11 01:50:11.064 INFO o.a.b.r.dataflow.DataflowRunner - To access the Dataflow monitoring console, please navigate to https://console.developers.google.com/project/myproject/dataflow/job/2017-12-10_14_50_12-10326138943752681303 Submitted job: 2017-12-10_14_50_12-10326138943752681303 2017/12/11 01:50:11.064 INFO o.a.b.r.dataflow.DataflowRunner - To cancel the job using the 'gcloud' tool, run: > gcloud beta dataflow jobs --project=myproject cancel 2017-12-10_14_50_12-10326138943752681303 [success] Total time: 17 s, completed Dec 11, 2017 1:50:11 AM[/code] You can navigate to DataFlow service in cloud console to verify it's running as expected and check the logs in Cloud Logging. You can find the code here In the second part I'll build a pipeline with control flow from another Pub/Sub topic via side input.

No Comments Yet

Let us know what you think

Subscribe by email