Reputation: 83
I want to use Merge operation on two Delta tables, but I don't want to write complex Insert / Update conditions, so ideally I'd like to use InsertAll() and UpdateAll(). It works well, but my source table contains an extra column that I don't want to populate to the target data, but I can't drop it as I need it to identify records to be deleted.
I found in documentation that there is an EXCEPT keyword that I could use. There is a sample written in SQL:
MERGE INTO target t
USING source s
ON t.id = s.id
WHEN MATCHED
THEN UPDATE SET last_updated = current_date()
WHEN NOT MATCHED
THEN INSERT * EXCEPT (last_updated)
Is there any way I could use the same in PySpark?
I know that I could write custom Insert and Update conditions, but that causes problems with Schema Evolution that don't happen when using InsertAll / UpdateAll()
Upvotes: 0
Views: 1749
Reputation: 87259
Unfortunately, there is no such functionality in the PySpark yet. But it's really to implement it programmatically, something like this:
from pyspark.sql.functions import *
update_df_name = "updates"
excluded_columsn = ["col1", "col2"]
values_to_insert = dict([(cl, col(f"{update_df_name}.{cl}"))
for cl in df.columns if cl not in excluded_columnes])
deltaTable.alias("events").merge(
source = updatesDF.alias(update_df_name),
condition = expr("events.eventId = updates.eventId")
).whenNotMatchedInsert(
values = values_to_insert
).execute()
Upvotes: 1