Reputation: 27
I would like to request your help in provinding information that could help me to evaluate two similar query plans in PySpark.
I tried to research before asking, but I didn't find an useful information.
I have the following two queries plans, this query plans give the same result at the end, but I want to know which is better and why.
The logic in a summary do:
attributes = spark.read.format('delta').load(path).select('col1', 'col2', 'col3')
attributes = attributes.groupBy('col1').pivot('col2', values = ['value1', 'value2']).agg(max('col3'))
attributes = attributes.withColumn('new', col('pivotvalue')).withColumn('new2', col('pivotvalue2'))
attributes = attributes.select('col1', 'new', 'new2')
attributes = attributes.dropna(how = 'all', subset = ['new', 'new2'])
# The drop Na is because the values that are not in the set given to the pivot column are not required.
The original query plan is the following:
== Physical Plan ==
AdaptiveSparkPlan (10)
+- == Initial Plan ==
Project (9)
+- Filter (8)
+- SortAggregate (7)
+- Sort (6)
+- Exchange (5)
+- SortAggregate (4)
+- Sort (3)
+- Project (2)
+- Scan parquet (1)
Consider that the .withColumn
had more columns that the presented and the pivot column has too much values which are not really necessary.
I learned that doing many nested withColumn
can be not a good practice if it can be done inside a select
and the logic are not to complicated, so it is possible to reduce the projections that would be nested.
So I did the following:
attributes = spark.read.format('delta').load(path).select('col1', 'col2', 'col3').where("col2 in ('value1', 'value2')")
attributes = attributes.groupBy('col1').pivot('col2').agg(max('col3')).selectExpr('col1', 'new as pivotvalue', 'new2 as pivotvalue2')
I did the filter in the first line instead of passing the whole values to avoid the dropna and reduce the amount of data that it would had to project. It generated the next following query plan:
== Physical Plan ==
AdaptiveSparkPlan (10)
+- == Initial Plan ==
Project (9)
+- SortAggregate (8)
+- Sort (7)
+- Exchange (6)
+- SortAggregate (5)
+- Sort (4)
+- Project (3)
+- Filter (2)
+- Scan parquet (1)
As you can see the plan it's pretty similar with the difference that in the second the filter operation is done before the project, on the other hand in the original it does a project and then the filter operation.
I know that it is better to filter first before do a project if it is possible (correct me if I'm wrong).
That being said, it made me think if there were a real 'difference'. For that reason I got the whole values from the cost mode to see if there were any difference between intermediate steps, which indeed existed.
I share both with you.
The first and original plan had the following:
1. Relation -> Statistics(sizeInBytes=3.2 GiB, ColumnStat: N/A)
2. Project -> Statistics(sizeInBytes=430.2 MiB, ColumnStat: N/A)
3. Aggregate -> Statistics(sizeInBytes=1147.1 MiB, ColumnStat: N/A)
4. Filter -> Statistics(sizeInBytes=1147.1 MiB, ColumnStat: N/A)
5. Project -> Statistics(sizeInBytes=645.2 MiB, ColumnStat: N/A)
The second and new plan had the following:
1. Relation -> Statistics(sizeInBytes=3.2 GiB, ColumnStat: N/A)
2. Filter -> Statistics(sizeInBytes=3.2 GiB, ColumnStat: N/A)
3. Project -> Statistics(sizeInBytes=430.2 MiB, ColumnStat: N/A)
4. Aggregate -> Statistics(sizeInBytes=1147.1 MiB, ColumnStat: N/A)
5. Project -> Statistics(sizeInBytes=645.2 MiB, ColumnStat: N/A)
Even when the final dataframe it is equal in both query plans, I understand that the second one is better due the following points:
So, is my understanding correct? If not, could you tell me why?
Finally, I'm aware that the AdaptiveSparkPlan
means that how it is executed could change in the runtime. I'm still researching and testing how it would affect but now I want to focus in the interpretation at this point before going deeper in the understanding of the AdaptiveSparkPlan
Upvotes: 0
Views: 36
Reputation: 8160
Yes, what you were saying is correct about filtering.
Which removes unnecessary records for further operations, that it reduces the number of records for groupBy
.
But for the best plan, not only filter you also need to focus on other aspects for optimizing.
Check operation involving shuffling, in your case it's group by, do partition on group by column to optimize.
Cache the dataframe after reading it.
Sample code
attributes2 = spark.read.format('delta').load(output_path).select('col1', 'col2', 'col3').where("col2 in ('value1', 'value2')")
attributes2.cache()
attributes2.count()
attributes2 = attributes2.groupBy('col1').pivot('col2').agg(max('col3')).selectExpr('col1', 'value1 as new', 'value2 as new2')
attributes2.explain(mode="cost")
After this any operation does re-evaluate, it just read from cached place.
isFinalPlan
as true
then you are having the optimized plan and no further optimization done.== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Initial Plan ==
Upvotes: 0