A. Frank
A. Frank

Reputation: 83

Delta merge using InsertAll() / UpdateAll() and EXCEPT keyword

I want to use Merge operation on two Delta tables, but I don't want to write complex Insert / Update conditions, so ideally I'd like to use InsertAll() and UpdateAll(). It works well, but my source table contains an extra column that I don't want to populate to the target data, but I can't drop it as I need it to identify records to be deleted.

I found in documentation that there is an EXCEPT keyword that I could use. There is a sample written in SQL:

MERGE INTO target t
USING source s
ON t.id = s.id
WHEN MATCHED
  THEN UPDATE SET last_updated = current_date()
WHEN NOT MATCHED
  THEN INSERT * EXCEPT (last_updated)

Is there any way I could use the same in PySpark?

I know that I could write custom Insert and Update conditions, but that causes problems with Schema Evolution that don't happen when using InsertAll / UpdateAll()

Upvotes: 0

Views: 1749

Answers (1)

Alex Ott
Alex Ott

Reputation: 87259

Unfortunately, there is no such functionality in the PySpark yet. But it's really to implement it programmatically, something like this:

from pyspark.sql.functions import *

update_df_name = "updates"
excluded_columsn = ["col1", "col2"]
values_to_insert = dict([(cl, col(f"{update_df_name}.{cl}")) 
  for cl in df.columns if cl not in excluded_columnes])

deltaTable.alias("events").merge(
    source = updatesDF.alias(update_df_name),
    condition = expr("events.eventId = updates.eventId")
  ).whenNotMatchedInsert(
     values = values_to_insert
  ).execute()

Upvotes: 1

Related Questions