Join, Group By, and Aggregate in Cloud Data Fusion

15 min read
Apr 9, 2020

cloudsql
Good news! Cloud Data Fusion is now GA. Announced at Google Next ‘19 UK on November 21, 2019, Cloud Data Fusion is a fully managed, cloud-native, enterprise data integration service for quickly building and managing data pipelines. Cloud Data Fusion web UI allows you to build scalable data integration solutions to clean, prepare, blend, transfer, and transform data, without having to manage the infrastructure. Cloud Data Fusion is powered by the open source project CDAP. Wondering how to get started with Cloud Data Fusion? This post shows you how to simply build and use the Wrangler and Data Pipelines features in Cloud Data Fusion to clean, transform, and process flight data.

ETL Process

The diagram below shows the transformations which are going to take place. This includes reading the two files, transforming the data, and loading it into one output: Total Flights per Airline.
etl

Objectives

  • Connect Cloud Data Fusion to data sources.
  • Apply basic transformations.
  • Join and Group By data sources.
  • Write to a sink.
You're ready to begin!

Log into the GC Console

First, go to the GCP console and log in using your Google account.

Select or Create a GC project

Select a project. If you don’t have any projects, go to the project selector page to create one. For this post, I've created a specific project named flights-analysis. I highly recommend you create a new project for this walkthrough. See Creating your project for more information.

Create GCS Bucket and Copy Data

You need data! The two small datasets are located in a GCS bucket that you must copy to your own bucket. First, create your bucket. You can do this by typing "bucket" in the resources and products search field and then selecting Create bucket.
bucket
In the Create a Bucket page, enter the name of your bucket (1). Remember that these are globally unique and for the purpose of this demonstration we'll use flights-analysis-data2. Select Region (2) for Location type, and select the desired Location (3). We'll use northamerica-northeast1 for this demonstration. Click Create (4) to create the bucket.
bucket_create

Activate Cloud Shell

Next, activate Cloud Shell. In the GCP console, click the Open Cloud Shell icon (top-right of the toolbar). You can click Continue when the dialog box appears.
cloud_shell
It takes a few moments to provision and connect to the environment. When you are connected, you are already authenticated and the project is set to your PROJECT_ID. The output is similar to the following:
cloud_shell2
You need to execute the following commands to copy the required sample files and structure to your GCS bucket. Replace [BUCKET_NAME] with the name of the bucket you created earlier.
gsutil cp gs://flights-analysis-sample-data/input/airlines.csv gs://[BUCKET_NAME]]/input/
 gsutil cp gs://flights-analysis-sample-data/input/flights_small.csv gs://[BUCKET_NAME]/input/
 
The output is similar to the following:
gsutil_copy

Create a Cloud Data Fusion Instance

You are now ready to create your Cloud Data Fusion instance.
  1. Go to the Cloud Data Fusion page. You can do this by typing "data fusion" in the resources and products search field and then selecting Data Fusion.
    datafusion-00
  2. If the Cloud Data Fusion API is not already enabled, activate it by clicking Enable. This might take a few moments to complete.
    datafusion-01
  3. Navigate back to the Data Fusion page. You are now ready to create a Data Fusion instance. Click Create An Instance.
    datafusion-02
  4. Enter an Instance name (1), select your Region (2), select Basic for Edition (3), and click Create (4) to deploy your instance. For the purpose of this exercise, we're using flights_data-etl as the name of the instance and northamerica-northeast1 as the region. You can enter your own values for these fields. Note: This requires several minutes to complete.
    datafusion-03
  5. After the instance deploys successfully, a green check mark appears. Click View Instance to continue.
    datafusion-04
  6. The next page provides details about the instance. Click View instance to go to your Cloud Data Fusion instance.
    datafusion-05
  7. Great work! You are now in your Cloud Data Fusion instance. Because you will be designing your data pipeline, click Studio to continue.
    datafusion-06

Building the Data Pipeline

  1. After accessing the Studio canvas, you are now ready to build the data pipeline. Make sure you are in Source (1) view, and then click GCS (2). A GCS Source appears on the canvas. Click Properties (3) from the GCS Source to continue.
    datafusion-07
  2. The GCS Properties configuration page appears. Specify the following values: - Label (1): “GCS - Flights Data” - Reference Name (2): “gcs_flights_data” - Path (3): Enter the path to the GCS Bucket you created earlier (where you saved the flight_small.csv file). For the purpose of this exercise, I included mine.
  3. Click Validate (4) to validate the information on the page. You should see (in green) "No errors found." Click X (5) to close/save the GSC properties.
    datafusion-08
  4. The next step is to "wrangle" the flights_small.csv dataset. Make sure you are in Transform (1) view, and then click the Wrangler (2). A Wrangler transform appears on the canvas.
  5. Connect (3) by clicking and dragging the small arrow from the GCS - Flights Data source to the Wrangler transform. Click Properties (4) in the Wrangler transform to continue.
    datafusion-09
  6. The Wrangler Properties configuration page appears. Enter “Wrangler - Flights Data” for Label (1). click Wrangle (2) to continue.
    datafusion-10
  7. Select the flights_small.csv file located in the GCS bucket you created earlier to continue.
    datafusion-11
  8. Next is a series of steps to parse and remove unwanted columns. Click the drop-down [Column transformations] in the body (1) column, and go to Parse -> CSV (2). Click Comma (3) as the delimiter, check Set first row as header (4), then click Apply (5) to continue.
    datafusion-12
  9. The result of the parse column transform added new columns that we must remove. Select the highlighted columns (1), click the drop-down [Column transformations] in the body (2) column, then select Delete selected columns (3) to continue.
    datafusion-13
  10. The results now show only the columns that you need to move forward. Click Apply to continue.
    datafusion-14
  11. Notice that the Recipe box is now populated with the directives you just specified. Click Validate (1) to validate the page. You should see (in green) "No errors found." Lastly, click X (2) to close/save the Wrangler Properties page.
    datafusion-15
  12. Now, let’s add the arlines.csv dataset. Make sure you are in Source (1) view, then click GCS (2). Another GCS source appears on the canvas. Click Properties (3) from the GCS Source to continue.
    datafusion-16
  13. The GCS Properties configuration page appears. Specify the following values: - Label (1): “GCS - Airlines Data” - Reference Name (2): “gcs_airlines_data” - Path (3): Enter the path to the GCS Bucket you created earlier (where you saved the airlines.csv file). For the purpose of this exercise, I include mine.
  14. Click Validate (4) to validate the page. You should see (in green) "No errors found." Lastly, click X (5) to close/save the GSC Properties page.
    datafusion-17
  15. Similar to what you did earlier for the flights_small.csv dataset, we now must wrangle the airlines.csv dataset. Make sure you are in Transform (1) view, then click Wrangler (2). Another Wrangler transform appears on the canvas.
  16. Connect (3) by clicking and dragging the small arrow from the GCS - Airlines Data source to the newly created Wrangler transform. Click Properties (4) in the Wrangler transform to continue.
    datafusion-18
  17. The Wrangler Properties configuration page appears. Enter “Wrangler - Airlines Data” in Label (1). Click Wrangle (2) to continue.
    datafusion-19
  18. Similar to what you did earlier, select the airlines.csv file in your GCS bucket to continue.
    datafusion-11
  19. Next is a series of steps to parse and remove unwanted columns. Click the drop-down [Column transformations] in the body (1) column, and go to Parse -> CSV (2). Select Comma (3) as the delimiter, select Set first row as header (4), then click Apply (5) to continue.
    datafusion-20
  20. The result of the parse column transform added new columns that we must remove from the body column. Click the drop-down [Column transformations] in the body (1) column, select Delete column (2), then click Apply (3) to continue.
    datafusion-21
  21. Notice that the Recipe box is now populated with the directives you just created. Click Validate (1) to validate the page. You should see (in green) "No errors found." Lastly, click X (2) to close/save the Wrangler Properties page.
    datafusion-22
  22. You are now ready to join the two datasets. Make sure you are in Analytics (1) view, then click Joiner (2). A Joiner task appears on the canvas.
  23. Connect (3) by clicking and dragging the small arrow from the Wrangler - Flights Data to the Joiner. Do the same for the Wrangler - Airlines Data. Click Properties (4) from the Joiner task to continue.
    datafusion-23
  24. The Joiner Properties configuration box appears. In the Join - Fields section, expand the Wrangler - Airlines Data (1) and clear the Code (2) checkbox. Enter “Airline_name” in the Description (3) field and select Inner (4) for the Join Type.
  25. In the Join Condition (5) section, enter “Airline” in the Wrangler - Flight Data field, and “Code” in the Wrangler - Airlines Data field. Click Get (6) for the schema, then click Validate (7) to validate the page. You should see (in green) "No errors found." Lastly, click X (8) to close/save the Joiner Properties page.
    datafusion-24
  26. Next, we must "group by" and aggregate. Make sure you are in Analytics (1) view, then click Group By (2). A Group By task appears on the canvas.
  27. Connect (3) by clicking and dragging the small arrow from the Joiner to the Group By task. Click Properties (4) from the Group By task to continue.
    datafusion-25
  28. The Group By Properties configuration box appears. In the Group by fields section (1), add the following fields: "Airline," "Airline_code," and "Airline_name."
  29. In the Aggregates (2) section, enter “Departure_schedule”, select Count as the aggregation, and enter “Flight_count” as an alias. Click Get Schema (3), then click Validate (4) to validate the page. You should see (in green) "No errors found." Lastly, click X (5) to close/save the Group By Properties page.
    datafusion-26
  30. To complete your data pipeline, you must set up a sink to output the results. Make sure you are in Sink (1) view, then click GCS (2). A GCS Sink appears on the canvas.
  31. Connect (3) by selecting and dragging the small arrow from the Group By task to the GCS Sink. Click Properties (4) from the GCS Sink to continue.
    datafusion-27
  32. The GCS Properties configuration page appears. Specify the following: - Label (1): “GCS - Flight Count” - Reference Name (2): “gcs_flight_count” - Path (3): Enter the path to the GCS Bucket you created earlier. For the purpose of this exercise, I include mine.
  33. Select csv for the Format (4) of the output file, then click Validate (5) to validate the page. You should see (in green) "No errors found." Lastly, click X (6) to close/save the GSC Properties page.
    datafusion-28
  34. Congrats! You have completed building your data pipeline. Now give your pipeline a Name (1), Save (2) it, then Deploy (3) it. It takes a few moments to finalize and complete the deployment process.
    datafusion-28_1
  35. You are now ready to execute your pipeline. Click Run.
    datafusion-29
  36. The pipeline goes through different Status cycles: Deployed, Provisioning, Starting, Running, Deprovisioning and Succeeded. When the pipeline Status is "Succeeded," examine the output file in your GCS bucket.
    datafusion-30
  37. To validate the output sink of your pipeline, go to the GCS bucket output folder and execute the following gsutil command to view the results. Make sure to replace [BUCKET_NAME] and [REPLACE_WITH_YOUR_FOLDER_DATE] with your information:
gsutil cat -h gs://[BUCKET_NAME]]/output/flight_count/[REPLACE_WITH_YOUR_FOLDER_DATE]/part-r-00000
 
The output is similar to the following:
datafusion-31
That’s it! You’ve just created and ran a complete data pipeline process on Cloud Data Fusion.

Cleanup

To avoid incurring charges to your Google Cloud platform account for the resources used in this post, do the following: If you want to delete the entire project:
  1. In the GC Console, go to the Manage resources page.
  2. In the project list, select the project you want to delete and click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.
If you just want to delete your Cloud Data Fusion instance:
  1. To view your existing Cloud Data Fusion instances, open the Instances page.
  2. To select an instance, check the box next to the instance name.
  3. To delete the instance, click Delete.
Note: You can also delete an instance by clicking Delete in the Instance Details page. Enjoy!

Get Email Notifications

No Comments Yet

Let us know what you think