Pythian Blog: Technical Track

Google Cloud Dataproc in ETL pipeline - part 1 (logging)

  Google Cloud Dataproc , now generally available, provides access to fully managed Hadoop and Apache Spark clusters, and leverages open source data tools for querying, batch/stream processing, and at-scale machine learning. To get more technical information on the specifics of the platform, refer to Google’s original blog post and product home page .   Having access to fully managed Hadoop/Spark based technology and powerful Machine Learning Library (MLlib) as part of Google Cloud Platform makes perfect sense as it allows you to reuse existing code and helps many to overcome the fear of being “locked into” one specific vendor while taking a step into big data processing in the cloud. That said, I would still recommend evaluating Google Cloud Dataflow first while implementing new projects and processes for its efficiency, simplicity and semantic-rich analytics capabilities, especially around stream processing.   When Cloud Dataproc was first released to the public, it received positive reviews. Many blogs were written on the subject with few taking it through some “tough” challenges on its promise to deliver cluster startup in "less than 90 seconds”. In general the product was well received, with the overall consensus that it is well positioned against the AWS EMR offering.   Being able, in a matter of minutes, to start Spark Cluster without any knowledge of the Hadoop ecosystem and having access to a powerful interactive shell such as Jupyter or Zeppelin is no doubt a Data Scientist’s dream. But with extremely fast startup/shutdown, “by the minute” billing and widely adopted technology stack, it also appears to be a perfect candidate for a processing block in bigger ETL pipelines. Orchestration, workflow engine, and logging are all crucial aspects of such solutions and I am planning to publish a few blog entries as I go through evaluation of each of these areas starting with Logging in this blog.  

Cloud Dataproc Logging

Cluster's system and daemon logs are accessible through cluster UIs as well as through SSH-ing to the cluster, but there is a much better way to do this. By default these logs are also pushed to Google Cloud Logging consolidating all logs in one place with flexible Log Viewer UI and filtering. One can even create custom log-based metrics and use these for baselining and/or alerting purposes. All cluster logs are aggregated under a "dataproc-hadoop” tag but “structPayload.filename” field can be used as a filter for specific log file.   In addition to relying on Logs Viewer UI, there is a way to integrate specific log messages into Cloud Storage or BigQuery for analysis. Just to get an idea on what logs are available by default, I have exported all Cloud Dataproc messages into BigQuery and queried new table with the following query:   SELECT structPayload.filename AS file_name, count(*) AS cnt FROM [dataproc_logs.dataproc_hadoop_20160217] WHERE metadata.labels.key='dataproc.googleapis.com/cluster_id' AND metadata.labels.value = 'cluster-2:205c03ea-6bea-4c80-bdca-beb6b9ffb0d6' GROUP BY file_name  
  • hadoop-hdfs-namenode-cluster-2-m.log
  • yarn-yarn-nodemanager-cluster-2-w-0.log
  • container_1455740844290_0001_01_000004.stderr
  • hadoop-hdfs-secondarynamenode-cluster-2-m.log
  • hive-metastore.log
  • hadoop-hdfs-datanode-cluster-2-w-1.log
  • hive-server2.log
  • container_1455740844290_0001_01_000001.stderr
  • container_1455740844290_0001_01_000002.stderr
  • hadoop-hdfs-datanode-cluster-2-w-0.log
  • yarn-yarn-nodemanager-cluster-2-w-1.log
  • yarn-yarn-resourcemanager-cluster-2-m.log
  • container_1455740844290_0001_01_000003.stderr
  • mapred-mapred-historyserver-cluster-2-m.log
  Google Cloud Logging is a customized version of fluentd - an open source data collector for unified logging layer. In addition to system logs and its own logs, fluentd is configured (refer to /etc/google-fluentd/google-fluentd.conf on master node) to tail hadoop, hive, and spark message logs as well as yarn application logs and pushes them under "dataproc-hadoop” tag into Google Cloud Logging.

Application Logging

You can submit a job to the cluster using Cloud Console, Cloud SDK or REST API. Cloud Dataproc automatically gathers driver (console) output from all the workers, and makes it available through Cloud Console . Logs from the job are also uploaded to the staging bucket specified when starting a cluster and can be accessed from there.   Note: One thing I found confusing is that when referencing driver output directory in Cloud Dataproc staging bucket you need Cluster ID (dataproc-cluster-uuid), however it is not yet listed on Cloud Dataproc Console. Having this ID or a direct link to the directory available from the Cluster Overview page is especially critical when starting/stopping many clusters as part of scheduled jobs. One way to get dataproc-cluster-uuid and a few other useful references is to navigate from Cluster "Overview" section to "VM Instances" and then to click on Master or any worker node and scroll down to "Custom metadata” section. Indeed, you can also get it using " gcloud beta dataproc clusters describe <CLUSTER_NAME> |grep clusterUuid" command but it would be nice to have it available through the console in a first place.   The job (driver) output however is currently dumped into console ONLY (refer to /etc/spark/conf/log4j.properties on master node) and although accessible through Dataproc Job interface, it is not currently available in Cloud Logging.   The easiest way around this issue, which can be easily implemented as part of Cluster initialization actions, is to modify /etc/spark/conf/log4j.properties by replacing " log4j.rootCategory=INFO, console ” with " log4j.rootCategory=INFO, console, file ” and add the following appender:   # Adding file appender log4j.appender.file=org.apache.log4j.RollingFileAppender log4j.appender.file.File=/var/log/spark/spark-log4j.log log4j.appender.file.layout=org.apache.log4j.PatternLayout log4j.appender.file.layout.conversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c: %m%n   Existing Cloud Dataproc fluentd configuration will automatically tail through all files under /var/log/spark directory adding events into Cloud Logging and should automatically pick up messages going into /var/log/spark/spark-log4j.log .   You can verify that logs from the job started to appear in Cloud Logging by firing up one of the examples provided with Cloud Dataproc and filtering Logs Viewer using the following rule: node.metadata.serviceName="dataproc.googleapis.com" structPayload.filename="spark-log4j.log"   If after this change messages are still not appearing in Cloud Logging, try restarting fluentd daemon by running "/etc/init.d/google-fluentd restart” command on master node. Once changes are implemented and output is verified you can declare logger in your process as: import pyspark sc = pyspark.SparkContext() logger = sc._jvm.org.apache.log4j.Logger.getLogger(__name__) and submit the job redefining logging level (INFO by default) using "--driver-log-levels". Learn more here.

No Comments Yet

Let us know what you think

Subscribe by email