Alex
Alex

Reputation: 27

Similar logical and Physical Plan in Pyspark. How to choose one?

I would like to request your help in provinding information that could help me to evaluate two similar query plans in PySpark.

I tried to research before asking, but I didn't find an useful information.

I have the following two queries plans, this query plans give the same result at the end, but I want to know which is better and why.

The logic in a summary do:

  1. Select specific columns.
  2. Applies a pivot operation to specific values and compute an aggregation.
  3. Compute new columns using the results from the pivot operation. The original code does something like this:
attributes =  spark.read.format('delta').load(path).select('col1', 'col2', 'col3')
attributes =  attributes.groupBy('col1').pivot('col2', values = ['value1', 'value2']).agg(max('col3'))
attributes = attributes.withColumn('new', col('pivotvalue')).withColumn('new2', col('pivotvalue2'))
attributes  = attributes.select('col1', 'new', 'new2')
attributes   = attributes.dropna(how = 'all', subset = ['new', 'new2'])
# The drop Na is because the values that are not in the set given to the pivot column are not required.

The original query plan is the following:

== Physical Plan ==
AdaptiveSparkPlan (10)
+- == Initial Plan ==
   Project (9)
   +- Filter (8)
      +- SortAggregate (7)
         +- Sort (6)
            +- Exchange (5)
               +- SortAggregate (4)
                  +- Sort (3)
                     +- Project (2)
                        +- Scan parquet  (1)

Consider that the .withColumn had more columns that the presented and the pivot column has too much values which are not really necessary.

I learned that doing many nested withColumn can be not a good practice if it can be done inside a select and the logic are not to complicated, so it is possible to reduce the projections that would be nested.

So I did the following:

attributes =  spark.read.format('delta').load(path).select('col1', 'col2', 'col3').where("col2 in ('value1', 'value2')")
attributes = attributes.groupBy('col1').pivot('col2').agg(max('col3')).selectExpr('col1', 'new as pivotvalue', 'new2 as pivotvalue2')

I did the filter in the first line instead of passing the whole values to avoid the dropna and reduce the amount of data that it would had to project. It generated the next following query plan:

== Physical Plan ==
AdaptiveSparkPlan (10)
+- == Initial Plan ==
   Project (9)
   +- SortAggregate (8)
      +- Sort (7)
         +- Exchange (6)
            +- SortAggregate (5)
               +- Sort (4)
                  +- Project (3)
                     +- Filter (2)
                        +- Scan parquet  (1)

As you can see the plan it's pretty similar with the difference that in the second the filter operation is done before the project, on the other hand in the original it does a project and then the filter operation.

I know that it is better to filter first before do a project if it is possible (correct me if I'm wrong).

That being said, it made me think if there were a real 'difference'. For that reason I got the whole values from the cost mode to see if there were any difference between intermediate steps, which indeed existed.

I share both with you.

The first and original plan had the following:

1. Relation -> Statistics(sizeInBytes=3.2 GiB, ColumnStat: N/A)
2. Project -> Statistics(sizeInBytes=430.2 MiB, ColumnStat: N/A)
3. Aggregate -> Statistics(sizeInBytes=1147.1 MiB, ColumnStat: N/A)
4. Filter -> Statistics(sizeInBytes=1147.1 MiB, ColumnStat: N/A)
5. Project -> Statistics(sizeInBytes=645.2 MiB, ColumnStat: N/A)

The second and new plan had the following:

1. Relation -> Statistics(sizeInBytes=3.2 GiB, ColumnStat: N/A)
2. Filter -> Statistics(sizeInBytes=3.2 GiB, ColumnStat: N/A)
3. Project -> Statistics(sizeInBytes=430.2 MiB, ColumnStat: N/A)
4. Aggregate -> Statistics(sizeInBytes=1147.1 MiB, ColumnStat: N/A)
5. Project -> Statistics(sizeInBytes=645.2 MiB, ColumnStat: N/A)

Even when the final dataframe it is equal in both query plans, I understand that the second one is better due the following points:

  1. The filter position, it is done earlier in step 2 compared to the original done in step 8.
  2. Data Reduction, it filters the data earlier, which means that the plan reduces the size of the data from 3.2 GiB to 430.2 MiB before doing the project. On the other hand the original plan do the project with 3.2 GiB which means the reduction is done basically after the whole values have been applied the pivot operation instead the second one that reduces the amount of data to apply the pivot.

So, is my understanding correct? If not, could you tell me why?

Finally, I'm aware that the AdaptiveSparkPlan means that how it is executed could change in the runtime. I'm still researching and testing how it would affect but now I want to focus in the interpretation at this point before going deeper in the understanding of the AdaptiveSparkPlan

Upvotes: 0

Views: 36

Answers (1)

JayashankarGS
JayashankarGS

Reputation: 8160

Yes, what you were saying is correct about filtering.

Which removes unnecessary records for further operations, that it reduces the number of records for groupBy .

But for the best plan, not only filter you also need to focus on other aspects for optimizing.

  1. Check operation involving shuffling, in your case it's group by, do partition on group by column to optimize.

  2. Cache the dataframe after reading it.

Sample code

attributes2 =  spark.read.format('delta').load(output_path).select('col1', 'col2', 'col3').where("col2 in ('value1', 'value2')")
attributes2.cache()
attributes2.count()
attributes2 = attributes2.groupBy('col1').pivot('col2').agg(max('col3')).selectExpr('col1', 'value1 as new', 'value2 as new2')
attributes2.explain(mode="cost")

After this any operation does re-evaluate, it just read from cached place.

  1. Next, when you see the isFinalPlan as true then you are having the optimized plan and no further optimization done.
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Initial Plan ==

Upvotes: 0

Related Questions