Reputation: 11
I want to do a merge on a subset of my delta table partitions to do incremental upserts to keep two tables in sync. I do not use a whenNotMatchedBySource statement to clean up stale rows in my target because of this GitHub Issue.
Because of that I am just focussing on update and insert in my first step. In this first step I am facing an issue that I need help with. As prerequisite for my merge I have extracted the most recent and relevant partitions (last X partitions) of the dataframe that I want to sync my delta table with (as other partitions will stay like they are for the rest of the time that the data is saved) and created a spark dataframe that only contains the relvant partitions called "updates" (to "prune" the source) (sidenote: delta table and spark dataframe are both having equivalent schemas).
My merge statement looks like this:
deltaTable_target.alias("target").merge(df_subset_relevant.alias("source"), f"target.PARTITION IN ({partition_list}) AND target.HASHKEY = source.HASHKEY").whenNotMatchedInsertAll().whenMatchedUpdateAll(condition = "target.col_that_can_change <> source.col_that_can_change").execute()
The merge statement itself works correctly according to the log files of the delta table (everything is inserted and updated correctly). Yet, when merging it marks even files as touched (and rewrites them) that do not have any changes in them (to be more specific I know that one of the partitions is exactly the same between source and target and it gets rewritten completely).
My assumption is that this is forced by the general merge statement that is used to partition prune the target table. At least I cannot imagine any other reason for that.
In an ideal world the numTargetRowsCopied size in the logs should be the size of the missing partitions and not the size of all partitions that are part of the merge, am I correct?
I mean maybe I do not understand the inner workings correclty and this is a necessary step. Any insight in this problem is highly appreciated.
Upvotes: 0
Views: 31