Pythian Blog: Technical Track

Spark performance regression with sum aggregations

There is an interesting bug that was found during the latest performance tuning we performed for Spark 2.2 (2.3 is also affected). It was a batch Spark job scheduled to be executed hourly and to process about 1Tb worth of data stored in parquet format. There were a number of things we tuned and this resulted in around 10% performance boost. We were looking for known issues and found explode related issue SPARK-21657. Although explode was used in our job it didn't cause any issues due to the low number of exploding rows. But this forced us to take a closer look into wholeStage codegen. Simple test with disabled spark.sql.codegen.wholeStage resulted in 30-40% time savings. We spent another day working on each piece of code and performed a test case with nothing but sum aggregations. [code language="scala"] val cnt = 50 val rows = 5000000 val dummy = udf(() => 1) def addConstColumns(inputDF: DataFrame) = (0 until cnt).foldLeft(inputDF)((df, idx) => df.withColumn(s"col_$idx", dummy())) spark.range(rows).toDF() .withColumn("grp", lit(1)) .transform(addConstColumns) .groupBy("grp") .agg(sum("col_0"), (1 until cnt).map(idx => sum(s"col_$idx")): _*) .collect() [/code] This code creates a dataframe with 50 columns that are sum aggregated. I did this test locally on a laptop (HotSpot JVM) with WHOLESTAGE_CODEGEN_ENABLED=true/false and noticed that timing was about 15 sec vs 3 sec. The result was it created a Spark Jira issue SPARK-23791. Apparently I found one very similar SPARK-20184 and SPARK-20479 issues targeted Spark 2.4. After a series of tests we also found that with a low number of cnt <= 13 and cnt>=100 there were no noticeable differences; with 14 <= cnt <= 83 it's about a 4-7 time difference and finally with 84 <= cnt <= 99 code simply failed with nasty a error: [code language="scala"]java.lang.ClassFormatError: Too many arguments in method signature in class file org/apache/spark/sql/catalyst/expressions/GeneratedClass$GeneratedIteratorForCodegenStage2 at java.lang.ClassLoader.defineClass1(Native Method)[/code] If you have a performance critical Spark job that's performing a lot of aggregations with Dataframe API be careful, there is no time savings that can be achieved with disabling wholeStage code generation.

No Comments Yet

Let us know what you think

Subscribe by email