Reputation: 27
I have been recently diving in my knowledge in Spark query plans in order to start tuning the process that we have developed so far.
So I started to test some stuff. I will share the scenario as much as possible but what matters more is how to compare which approach it's better.
We have delta table that we used to enrich our process (of many but I starting to understand so I'm testing in only one)
The main poins are:
The code do something like this:
table1 = spark.read.format('delta').load('somepath')
df1 = table1.where("columna1 = 'x' and column2 = 'y'")
df2 = table1.groupBy("column4","column5").agg(max("DATE").alias("Date"))
df1 = df1.join(df2, (df1.con1 == df2.cond1) & (df1.con2 == df2.con2) & (df1.con3 == df2.con3) , how = "inner").select("col1", "col2", "col3","col4")
Applying the explain method to see the query plan I got the following:
== Physical Plan ==
AdaptiveSparkPlan (18)
+- Project (17)
+- SortMergeJoin Inner (16)
:- Sort (4)
: +- Exchange (3)
: +- Filter (2)
: +- Scan parquet (1)
+- Sort (15)
+- Exchange (14)
+- Filter (13)
+- SortAggregate (12)
+- Sort (11)
+- Exchange (10)
+- SortAggregate (9)
+- Sort (8)
+- Project (7)
+- Filter (6)
+- Scan parquet (5)
Understanding the business rules, I discovered I could do it in less steps using a window instead of join itself. Which lead me to:
table1 = spark.read.format('delta').load('somepath').where("columna1 = 'x' and column2 = 'y'")
windowSpec = Window.partitionBy("rule1", "rule2").orderBy(col("rule1").desc())
final = table1.withColumn("recent_value", max("DATE").over(windowSpec)) \
.filter(col("DATE") == col("recent_value")).select("col1", "col2", "col3","col4")
Aplying the explain in this approach I got:
== Physical Plan ==
AdaptiveSparkPlan (8)
+- Project (7)
+- Filter (6)
+- Window (5)
+- Sort (4)
+- Exchange (3)
+- Filter (2)
+- Scan parquet (1)
As far as my understanding, I know that the second one is better because I avoid Exchanges, SortMergeJoin which are expensive operations and I also reduce the amont of total steps.
In addition, I analized the cost function using the .explain("cost")
.
In the first plan it returned Statistics(sizeInBytes=3.6 TiB, ColumnStat: N/A)
which is SO HUGE.
In the second plan it returned: Statistics(sizeInBytes=20.1 MiB, ColumnStat: N/A)
which is nothing compared to the first one. Considering the final state of the dataframe.
I used this post as a reference
Finally, considering that I'm aplying a windows which means bring everything to the main driver or one unique node, i valided the partitions in each method.
In the original had 4 partitions and in the new one it had 1 partition. Considering that it is a window, and I saw there was still one Exchange
, I applied a Coalesce
, it returned the same query plan but it has a Coalesce
instead of the Exchange
.
The plans becomes as the following:
== Physical Plan ==
AdaptiveSparkPlan (8)
+- Project (7)
+- Filter (6)
+- Window (5)
+- Sort (4)
+- Coalesce (3)
+- Filter (2)
+- Scan parquet (1)
Finally, I did a last change to see how it affect the query plan, instead of doing the select at the end of the needed columns I did at the beginning.
It returned the following plan:
== Physical Plan ==
AdaptiveSparkPlan (8)
+- Filter (7)
+- Window (6)
+- Sort (5)
+- Coalesce (4)
+- Project (3)
+- Filter (2)
+- Scan parquet (1)
With the unique diference in the order of execution.
So, my questions are and request your help is (if you have a post or any link feel free to share):
Correct me if I'm wrong, but first as I said before :
The second aproach is better than the first (original)? I'm pretty sure it is as far my understanding I only want to be sure and as I said a reduce amount of data loaded as well as the steps in the query plan.
Second, since I using a windows, I considered that it would be better to have a Coalesce
instead of the Exchange
because the data does not need to be suffled. Is it better in this case the Coalesce
or is better to have the Exchange
? And why?
Third, I know that the order of how operations are done matter. This leads me to one last questions, it is better to select first the columns that you need, and then apply a filter? Or is better to filter first and do the select after? Considering that for example I use 4 columns of 20 columns, and in this 4 columns are the ones I apply the filter. Wouldn't be better to do first the select so spark only read these for columns? or is it better to first filter and do the select after?
Finally, it has relation with the third, is it better have first a project
in the query plan? or it is better to have the project
in the end? If it depends in the specific case, any tip you can share to know how to select the best.
Upvotes: 0
Views: 256
Reputation: 3260
SortMergeJoin
after filtering, which requires sorting and shuffling data across the cluster. It also involves multiple Exchanges
and SortAggregate
operations.window
function, which avoids the SortMergeJoin
and Exchange
will make it more efficient.In the 2nd approach using a Window
operation to find the maximum date per partition and applies a Filter
to keep only the most recent records.
so better to use window function so it will handle the partitioned and ordered data, which is more efficient than a self-join when looking for recent records in a grouped data.
As you mentioned that you why is it better to use the Coalesce or is better to have the Exchange? And why?
I have created an example and applying the window
function to find the most recent purchase for each customer
windowSpec = Window.partitionBy("CustomerID").orderBy(col("PurchaseDate").desc())
df_with_recent = df.withColumn("MostRecentPurchase", max("PurchaseDate").over(windowSpec)) \
.filter(col("PurchaseDate") == col("MostRecentPurchase")) \
.select("CustomerID", "Product", "PurchaseDate", "Amount")
Results:
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- == Initial Plan ==
Project [CustomerID#2L, Product#3, PurchaseDate#4, Amount#5L]
+- Filter ((isnotnull(PurchaseDate#4) AND isnotnull(MostRecentPurchase#10)) AND (PurchaseDate#4 = MostRecentPurchase#10))
+- Window [CustomerID#2L, Product#3, PurchaseDate#4, Amount#5L, max(PurchaseDate#4) windowspecdefinition(CustomerID#2L, PurchaseDate#4 DESC NULLS LAST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS MostRecentPurchase#10], [CustomerID#2L], [PurchaseDate#4 DESC NULLS LAST]
+- Sort [CustomerID#2L ASC NULLS FIRST, PurchaseDate#4 DESC NULLS LAST], false, 0
+- Exchange hashpartitioning(CustomerID#2L, 200), ENSURE_REQUIREMENTS, [plan_id=129]
+- Scan ExistingRDD[CustomerID#2L,Product#3,PurchaseDate#4,Amount#5L]
df_coalesced = df.coalesce(1)
window_spec = Window.partitionBy("CustomerID").orderBy(col("PurchaseDate").desc())
df_with_recent = df_coalesced.withColumn("recent_value", max("PurchaseDate").over(window_spec))
df_result = df_with_recent.filter(col("PurchaseDate") == col("recent_value"))
Results:
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- == Initial Plan ==
Filter ((isnotnull(PurchaseDate#94) AND isnotnull(recent_value#100)) AND (PurchaseDate#94 = recent_value#100))
+- Window [CustomerID#92L, Product#93, PurchaseDate#94, Amount#95L, max(PurchaseDate#94) windowspecdefinition(CustomerID#92L, PurchaseDate#94 DESC NULLS LAST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS recent_value#100], [CustomerID#92L], [PurchaseDate#94 DESC NULLS LAST]
+- Sort [CustomerID#92L ASC NULLS FIRST, PurchaseDate#94 DESC NULLS LAST], false, 0
+- Coalesce 1
+- Scan ExistingRDD[CustomerID#92L,Product#93,PurchaseDate#94,Amount#95L]
In the above code performing Coalesce
to the DataFrame to reduce the number of partitions
Applying window function to find the most recent purchase for each CustomerID
Upvotes: 0