Reputation: 33
How does partition pruning work on a merge into statement?
I have a delta table that is partitioned by Year, Date and month. I'm trying to merge data to this on all three partition columns + an extra column (an ID). My merge statement is below:
MERGE INTO delta.<path of delta table> oldData
using df newData
on oldData.year = '2023' and oldData.month = '10' and oldData.day = '12' and oldData.clientid= newData.clientid W
HEN MATCHED THEN DELETE
When I do an explain on this query I get this plan:
== Physical Plan ==
Execute MergeIntoCommandEdge
+- MergeIntoCommandEdge SubqueryAlias newData, SubqueryAlias oldData, Delta[version=4, ... , (((year#3761220 = 2023) AND (month#3761221 = 10)) AND ((day#3761222 = 12) AND (clientid#3761212 = clientid#3532751)))
This query runs for a long time considering I think the data im trying to process is small (less than 1M). Also based on this link I should see some sort of mention of partition in the physical plan but I dont. Any ideas why it seems that my merge statement is not using the partition when executing the merge statement?
This is using Azure databricks btw. Thanks!
I have tried using the partitions as conditions in the merge statement in order for partition pruning to happen, however based on the physical plan and some test runs it didn't.
Upvotes: 3
Views: 1464
Reputation: 482
The link you are referring to shows the physical execution plan. On the other hand simple EXPLAIN
command only shows physical plan produced out of parsed SQL statement. It is capable only to parse and analyze the statement itself, it is not aware of the partitioning of the underlying tables and the way they are going to be accessed.
Let me show you the example similar to your use case.
Code below creates delta table saved as default.test_merge
. The partitioning scheme and distribution of data ensure we have 12 files written into delta table.
import pyspark.sql.types as t
from delta.tables import DeltaTable
table_name = "default.test_merge"
schema = t.StructType([
t.StructField("year", t.IntegerType(), True),
t.StructField("month", t.IntegerType(), True),
t.StructField("day", t.IntegerType(), True),
t.StructField("clientid", t.IntegerType(), True),
])
DeltaTable.createIfNotExists(spark) \
.tableName(table_name) \
.addColumns(schema) \
.partitionedBy('year', 'month', 'day') \
.execute()
data = [(2023, 1, 12, 333),
(2023, 2, 12, 333),
(2023, 3, 12, 333),
(2023, 4, 12, 333),
(2023, 5, 12, 333),
(2023, 6, 12, 333),
(2023, 7, 12, 777),
(2023, 8, 12, 777),
(2023, 9, 12, 777),
(2023, 10, 12, 777),
(2023, 11, 12, 777),
(2023, 12, 12, 777)]
df = spark.createDataFrame(data=data, schema=schema)
df.write.mode("overwrite").insertInto(table_name)
We also have Dataframe saved as temporal view df_tmp_view
to serve as incoming dataset. We attempt to delete with MERGE
only one row and use partition elimination.
schema = t.StructType([
t.StructField("dummy", t.StringType(), True),
t.StructField("clientid", t.IntegerType(), True),
])
data = [('AAA', 1),('BBB', 777),('CCC', 3)]
df = spark.createDataFrame(data=data, schema=schema)
df.createOrReplaceTempView("df_tmp_view")
Before we run MERGE
statement we can check the plans generated out of parsing with EXPLAIN EXTENDED
. There are four of them, but let's take a look at first one called == Parsed Logical Plan ==
.
EXPLAIN EXTENDED
MERGE INTO default.test_merge oldData
using df_tmp_view newData
on oldData.year = '2023' and oldData.month = '10' and oldData.day = '12' and oldData.clientid= newData.clientid
WHEN MATCHED THEN DELETE
The results show that for sure there will be merge executed, but no information how the data are going to be delivered.
== Parsed Logical Plan ==
'MergeIntoTable ((('oldData.year = 2023) AND ('oldData.month = 10)) AND (('oldData.day = 12) AND ('oldData.clientid = 'newData.clientid))), [deleteaction(None)]
:- 'SubqueryAlias oldData
: +- 'UnresolvedRelation [default, test_merge], [], false
+- 'SubqueryAlias newData
+- 'UnresolvedRelation [df_tmp_view], [], false
Now finally we can execute the statement. To get the information about actual executed plan you need to investigate the query detail in SparkUI. Here is what it shows for me.
== Physical Plan ==
AdaptiveSparkPlan (48)
+- == Final Plan ==
SerializeFromObject (27)
+- MapPartitions (26)
+- DeserializeToObject (25)
+- * Project (24)
+- ObjectHashAggregate (23)
+- AQEShuffleRead (22)
+- ShuffleQueryStage (21), Statistics(sizeInBytes=224.0 B, rowCount=1, isRuntime=true)
+- Exchange (20)
+- ObjectHashAggregate (19)
+- ObjectHashAggregate (18)
+- AQEShuffleRead (17)
+- ShuffleQueryStage (16), Statistics(sizeInBytes=224.0 B, rowCount=1, isRuntime=true)
+- Exchange (15)
+- ObjectHashAggregate (14)
+- * Project (13)
+- * BroadcastHashJoin Inner BuildRight (12)
:- * Project (4)
: +- * Filter (3)
: +- * Project (2)
: +- * Scan ExistingRDD mergeMaterializedSource (1)
+- ShuffleQueryStage (11), Statistics(sizeInBytes=168.0 B, rowCount=1, isRuntime=true)
+- Exchange (10)
+- * Project (9)
+- * Filter (8)
+- * Project (7)
+- * ColumnarToRow (6)
+- Scan parquet spark_catalog.default.test_merge (5)
At the very bottom we see the scanning of the default.test_merge
delta table. Additional information are also provided for this step (5).
(5) Scan parquet spark_catalog.default.test_merge
Output [6]: [clientid#2383, _databricks_internal_edge_computed_column_row_index#2423L, year#2380, month#2381, day#2382, file_path#2568]
Batched: true
Location: TahoeFileIndexStoringMatchedFiles [dbfs:/user/hive/warehouse/test_merge]
PartitionFilters: [isnotnull(year#2380), isnotnull(month#2381), isnotnull(day#2382), (year#2380 = 2023), (month#2381 = 10), (day#2382 = 12)]
PushedFilters: [IsNotNull(clientid)]
ReadSchema: struct<clientid:int,_databricks_internal_edge_computed_column_row_index:bigint>
Here you can see that PartitionFilters
are provided as expected and in the metrics corresponding to this step we see only one partition was scanned.
Upvotes: 2