Jong
Jong

Reputation: 33

Partition Pruning in Merge Into statement

How does partition pruning work on a merge into statement?

I have a delta table that is partitioned by Year, Date and month. I'm trying to merge data to this on all three partition columns + an extra column (an ID). My merge statement is below:

MERGE INTO delta.<path of delta table> oldData 
using df newData 
on oldData.year = '2023' and oldData.month = '10' and oldData.day = '12' and oldData.clientid= newData.clientid W
HEN MATCHED THEN DELETE

When I do an explain on this query I get this plan:

== Physical Plan ==
Execute MergeIntoCommandEdge
   +- MergeIntoCommandEdge SubqueryAlias newData, SubqueryAlias oldData, Delta[version=4, ... , (((year#3761220 = 2023) AND (month#3761221 = 10)) AND ((day#3761222 = 12) AND (clientid#3761212 = clientid#3532751)))

This query runs for a long time considering I think the data im trying to process is small (less than 1M). Also based on this link I should see some sort of mention of partition in the physical plan but I dont. Any ideas why it seems that my merge statement is not using the partition when executing the merge statement?

This is using Azure databricks btw. Thanks!

I have tried using the partitions as conditions in the merge statement in order for partition pruning to happen, however based on the physical plan and some test runs it didn't.

Upvotes: 3

Views: 1464

Answers (1)

Paweł Tajs
Paweł Tajs

Reputation: 482

The link you are referring to shows the physical execution plan. On the other hand simple EXPLAIN command only shows physical plan produced out of parsed SQL statement. It is capable only to parse and analyze the statement itself, it is not aware of the partitioning of the underlying tables and the way they are going to be accessed.

Let me show you the example similar to your use case.

Code below creates delta table saved as default.test_merge. The partitioning scheme and distribution of data ensure we have 12 files written into delta table.

import pyspark.sql.types as t
from delta.tables import DeltaTable

table_name = "default.test_merge"

schema = t.StructType([
    t.StructField("year", t.IntegerType(), True),
    t.StructField("month", t.IntegerType(), True),
    t.StructField("day", t.IntegerType(), True),
    t.StructField("clientid", t.IntegerType(), True),
])

DeltaTable.createIfNotExists(spark) \
    .tableName(table_name) \
    .addColumns(schema) \
    .partitionedBy('year', 'month', 'day') \
    .execute()

data = [(2023, 1, 12, 333),
        (2023, 2, 12, 333),
        (2023, 3, 12, 333),
        (2023, 4, 12, 333),
        (2023, 5, 12, 333),
        (2023, 6, 12, 333),
        (2023, 7, 12, 777),
        (2023, 8, 12, 777),
        (2023, 9, 12, 777),
        (2023, 10, 12, 777),
        (2023, 11, 12, 777),
        (2023, 12, 12, 777)]

df = spark.createDataFrame(data=data, schema=schema)
df.write.mode("overwrite").insertInto(table_name)

We also have Dataframe saved as temporal view df_tmp_view to serve as incoming dataset. We attempt to delete with MERGE only one row and use partition elimination.

schema = t.StructType([
    t.StructField("dummy", t.StringType(), True),    
    t.StructField("clientid", t.IntegerType(), True),
])

data = [('AAA', 1),('BBB', 777),('CCC', 3)]

df = spark.createDataFrame(data=data, schema=schema)

df.createOrReplaceTempView("df_tmp_view")

Before we run MERGE statement we can check the plans generated out of parsing with EXPLAIN EXTENDED. There are four of them, but let's take a look at first one called == Parsed Logical Plan ==.

EXPLAIN EXTENDED
MERGE INTO default.test_merge oldData
using df_tmp_view newData
on oldData.year = '2023' and oldData.month = '10' and oldData.day = '12' and oldData.clientid= newData.clientid
WHEN MATCHED THEN DELETE

The results show that for sure there will be merge executed, but no information how the data are going to be delivered.

== Parsed Logical Plan ==
'MergeIntoTable ((('oldData.year = 2023) AND ('oldData.month = 10)) AND (('oldData.day = 12) AND ('oldData.clientid = 'newData.clientid))), [deleteaction(None)]
:- 'SubqueryAlias oldData
:  +- 'UnresolvedRelation [default, test_merge], [], false
+- 'SubqueryAlias newData
   +- 'UnresolvedRelation [df_tmp_view], [], false

Now finally we can execute the statement. To get the information about actual executed plan you need to investigate the query detail in SparkUI. Here is what it shows for me.

== Physical Plan ==
AdaptiveSparkPlan (48)
+- == Final Plan ==
   SerializeFromObject (27)
   +- MapPartitions (26)
      +- DeserializeToObject (25)
         +- * Project (24)
            +- ObjectHashAggregate (23)
               +- AQEShuffleRead (22)
                  +- ShuffleQueryStage (21), Statistics(sizeInBytes=224.0 B, rowCount=1, isRuntime=true)
                     +- Exchange (20)
                        +- ObjectHashAggregate (19)
                           +- ObjectHashAggregate (18)
                              +- AQEShuffleRead (17)
                                 +- ShuffleQueryStage (16), Statistics(sizeInBytes=224.0 B, rowCount=1, isRuntime=true)
                                    +- Exchange (15)
                                       +- ObjectHashAggregate (14)
                                          +- * Project (13)
                                             +- * BroadcastHashJoin Inner BuildRight (12)
                                                :- * Project (4)
                                                :  +- * Filter (3)
                                                :     +- * Project (2)
                                                :        +- * Scan ExistingRDD mergeMaterializedSource (1)
                                                +- ShuffleQueryStage (11), Statistics(sizeInBytes=168.0 B, rowCount=1, isRuntime=true)
                                                   +- Exchange (10)
                                                      +- * Project (9)
                                                         +- * Filter (8)
                                                            +- * Project (7)
                                                               +- * ColumnarToRow (6)
                                                                  +- Scan parquet spark_catalog.default.test_merge (5)

At the very bottom we see the scanning of the default.test_merge delta table. Additional information are also provided for this step (5).

(5) Scan parquet spark_catalog.default.test_merge
Output [6]: [clientid#2383, _databricks_internal_edge_computed_column_row_index#2423L, year#2380, month#2381, day#2382, file_path#2568]
Batched: true
Location: TahoeFileIndexStoringMatchedFiles [dbfs:/user/hive/warehouse/test_merge]
PartitionFilters: [isnotnull(year#2380), isnotnull(month#2381), isnotnull(day#2382), (year#2380 = 2023), (month#2381 = 10), (day#2382 = 12)]
PushedFilters: [IsNotNull(clientid)]
ReadSchema: struct<clientid:int,_databricks_internal_edge_computed_column_row_index:bigint>

Here you can see that PartitionFilters are provided as expected and in the metrics corresponding to this step we see only one partition was scanned.

enter image description here

Upvotes: 2

Related Questions