Alex
Alex

Reputation: 27

How To Evaluate different Spark Physical Plan

I have been recently diving in my knowledge in Spark query plans in order to start tuning the process that we have developed so far.

So I started to test some stuff. I will share the scenario as much as possible but what matters more is how to compare which approach it's better.

We have delta table that we used to enrich our process (of many but I starting to understand so I'm testing in only one)

The main poins are:

The code do something like this:

table1 = spark.read.format('delta').load('somepath')
df1 = table1.where("columna1 = 'x' and column2 = 'y'")
df2 = table1.groupBy("column4","column5").agg(max("DATE").alias("Date"))

df1 = df1.join(df2, (df1.con1 == df2.cond1) & (df1.con2 == df2.con2) & (df1.con3 == df2.con3) , how = "inner").select("col1", "col2", "col3","col4")

Applying the explain method to see the query plan I got the following:

== Physical Plan ==
AdaptiveSparkPlan (18)
+- Project (17)
   +- SortMergeJoin Inner (16)
      :- Sort (4)
      :  +- Exchange (3)
      :     +- Filter (2)
      :        +- Scan parquet  (1)
      +- Sort (15)
         +- Exchange (14)
            +- Filter (13)
               +- SortAggregate (12)
                  +- Sort (11)
                     +- Exchange (10)
                        +- SortAggregate (9)
                           +- Sort (8)
                              +- Project (7)
                                 +- Filter (6)
                                    +- Scan parquet  (5)

Understanding the business rules, I discovered I could do it in less steps using a window instead of join itself. Which lead me to:

table1 = spark.read.format('delta').load('somepath').where("columna1 = 'x' and column2 = 'y'")
windowSpec = Window.partitionBy("rule1", "rule2").orderBy(col("rule1").desc())
final = table1.withColumn("recent_value", max("DATE").over(windowSpec)) \
                        .filter(col("DATE") == col("recent_value")).select("col1", "col2", "col3","col4")

Aplying the explain in this approach I got:

== Physical Plan ==
AdaptiveSparkPlan (8)
+- Project (7)
   +- Filter (6)
      +- Window (5)
         +- Sort (4)
            +- Exchange (3)
               +- Filter (2)
                  +- Scan parquet  (1)

As far as my understanding, I know that the second one is better because I avoid Exchanges, SortMergeJoin which are expensive operations and I also reduce the amont of total steps.

In addition, I analized the cost function using the .explain("cost").

In the first plan it returned Statistics(sizeInBytes=3.6 TiB, ColumnStat: N/A) which is SO HUGE.

In the second plan it returned: Statistics(sizeInBytes=20.1 MiB, ColumnStat: N/A) which is nothing compared to the first one. Considering the final state of the dataframe.

I used this post as a reference

Finally, considering that I'm aplying a windows which means bring everything to the main driver or one unique node, i valided the partitions in each method.

In the original had 4 partitions and in the new one it had 1 partition. Considering that it is a window, and I saw there was still one Exchange, I applied a Coalesce, it returned the same query plan but it has a Coalesce instead of the Exchange.

The plans becomes as the following:

== Physical Plan ==
AdaptiveSparkPlan (8)
+- Project (7)
   +- Filter (6)
      +- Window (5)
         +- Sort (4)
            +- Coalesce (3)
               +- Filter (2)
                  +- Scan parquet  (1)

Finally, I did a last change to see how it affect the query plan, instead of doing the select at the end of the needed columns I did at the beginning.

It returned the following plan:

== Physical Plan ==
AdaptiveSparkPlan (8)
+- Filter (7)
   +- Window (6)
      +- Sort (5)
         +- Coalesce (4)
            +- Project (3)
               +- Filter (2)
                  +- Scan parquet  (1)

With the unique diference in the order of execution.

So, my questions are and request your help is (if you have a post or any link feel free to share):

Correct me if I'm wrong, but first as I said before :

Upvotes: 0

Views: 256

Answers (1)

  • In the first approach the self join uses SortMergeJoin after filtering, which requires sorting and shuffling data across the cluster. It also involves multiple Exchanges and SortAggregateoperations.
  • Rather than using the self-join Using a window function, which avoids the SortMergeJoin and Exchange will make it more efficient.

In the 2nd approach using a Window operation to find the maximum date per partition and applies a Filter to keep only the most recent records. so better to use window function so it will handle the partitioned and ordered data, which is more efficient than a self-join when looking for recent records in a grouped data.

As you mentioned that you why is it better to use the Coalesce or is better to have the Exchange? And why?

I have created an example and applying the window function to find the most recent purchase for each customer

windowSpec = Window.partitionBy("CustomerID").orderBy(col("PurchaseDate").desc())
df_with_recent = df.withColumn("MostRecentPurchase", max("PurchaseDate").over(windowSpec)) \
    .filter(col("PurchaseDate") == col("MostRecentPurchase")) \
    .select("CustomerID", "Product", "PurchaseDate", "Amount")

Results:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- == Initial Plan ==
   Project [CustomerID#2L, Product#3, PurchaseDate#4, Amount#5L]
   +- Filter ((isnotnull(PurchaseDate#4) AND isnotnull(MostRecentPurchase#10)) AND (PurchaseDate#4 = MostRecentPurchase#10))
      +- Window [CustomerID#2L, Product#3, PurchaseDate#4, Amount#5L, max(PurchaseDate#4) windowspecdefinition(CustomerID#2L, PurchaseDate#4 DESC NULLS LAST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS MostRecentPurchase#10], [CustomerID#2L], [PurchaseDate#4 DESC NULLS LAST]
         +- Sort [CustomerID#2L ASC NULLS FIRST, PurchaseDate#4 DESC NULLS LAST], false, 0
            +- Exchange hashpartitioning(CustomerID#2L, 200), ENSURE_REQUIREMENTS, [plan_id=129]
               +- Scan ExistingRDD[CustomerID#2L,Product#3,PurchaseDate#4,Amount#5L]
df_coalesced = df.coalesce(1)
window_spec = Window.partitionBy("CustomerID").orderBy(col("PurchaseDate").desc())
df_with_recent = df_coalesced.withColumn("recent_value", max("PurchaseDate").over(window_spec))
df_result = df_with_recent.filter(col("PurchaseDate") == col("recent_value"))

Results:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- == Initial Plan ==
   Filter ((isnotnull(PurchaseDate#94) AND isnotnull(recent_value#100)) AND (PurchaseDate#94 = recent_value#100))
   +- Window [CustomerID#92L, Product#93, PurchaseDate#94, Amount#95L, max(PurchaseDate#94) windowspecdefinition(CustomerID#92L, PurchaseDate#94 DESC NULLS LAST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS recent_value#100], [CustomerID#92L], [PurchaseDate#94 DESC NULLS LAST]
      +- Sort [CustomerID#92L ASC NULLS FIRST, PurchaseDate#94 DESC NULLS LAST], false, 0
         +- Coalesce 1
            +- Scan ExistingRDD[CustomerID#92L,Product#93,PurchaseDate#94,Amount#95L]

In the above code performing Coalesce to the DataFrame to reduce the number of partitions Applying window function to find the most recent purchase for each CustomerID

  • I agree with @Ganesh Chandrasekaran it is better to filter first and then select the necessary columns when working with Spark. Because by filtering out unnecessary rows early, you can manage memory usage better.
  • Project (Select) is better for performance when you have a wide DataFrame and only need a subset of columns for next operations.

Upvotes: 0

Related Questions