chaitra k
chaitra k

Reputation: 421

Delta merge logic whenMatchedDelete case

I'm working on the delta merge logic and wanted to delete a row on the delta table when the row gets deleted on the latest dataframe read.

My sample DF as shown below

df = spark.createDataFrame(
[
    ('Java', "20000"),  # create your data here, be consistent in the types.
  ('PHP', '40000'),
  ('Scala', '50000'),
  ('Python', '10000')
],
["language", "users_count"]  # add your column names here
)

Insert the data to a delta table

df.write.format("delta").mode("append").saveAsTable("xx.delta_merge_check")

On the next read, i've removed the row that shows ('python', '10000'), and now I want to delete this row from the delta table using delta merge API.

df_latest = spark.createDataFrame(
[
    ('Java', "20000"),  # create your data here, be consistent in the types.
  ('PHP', '40000'),
  ('Scala', '50000')
],
["language", "users_count"]  # add your column names here
)

I'm using the below code for the delta merge API

Read the existing delta table:

from delta.tables import *
test_delta = DeltaTable.forPath(spark, 
"wasbs://[email protected]/hive/warehouse/xx/delta_merge_check")

merge the changes:

test_delta.alias("t").merge(df_latest.alias("s"),
"s.language = t.language").whenMatchedDelete(condition = "s.language = 
true").whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

But unfortunately this doesn't delete the row('python', '10000') from the delta table, is there any other way to achieve this any help would be much appreciated.

Upvotes: 4

Views: 13786

Answers (3)

Louis
Louis

Reputation: 11

@Roman answered your question already. However, using the Delta Lake Python API you can delete unmatched rows as follows:

(targetDF
  .merge(sourceDF, "source.key = target.key")
  .whenNotMatchedBySourceDelete()
  .execute()
)

Where the .whenNotMatchedBySourceDelete() clause deletes records in the target table with no matching records in the source table based on the merge condition "source.key = target.key".

In your case the merge logic would look like this:

(test_delta.alias("t")
  .merge(df_latest.alias("s"), "s.language = t.language")
  .whenNotMatchedBySourceDelete()
  .execute()
)

You may also find yourself wanting to update rows if there is no match. As this is outside the scope of your question you can find more info in the official Delta docs.

Upvotes: 0

Roman Tkachuk
Roman Tkachuk

Reputation: 3276

For SparkSQL recently added new syntax which allow to delete data as you want:

MERGE INTO target_table_name [target_alias]
   USING source_table_reference [source_alias]
   ON merge_condition
   { WHEN MATCHED [ AND matched_condition ] THEN matched_action |
     WHEN NOT MATCHED [BY TARGET] [ AND not_matched_condition ] THEN not_matched_action |
     WHEN NOT MATCHED BY SOURCE [ AND not_matched_by_source_condition ] THEN not_matched_by_source_action } [...]

so for WHEN NOT MATCHED BY SOURCE you can use DELETE.

Full doc: https://docs.databricks.com/en/sql/language-manual/delta-merge-into.html

But it looks like pySpark does not support it now

Upvotes: 0

Alex Ott
Alex Ott

Reputation: 87329

This won't work the way you think it should - the basic problem is that your 2nd dataset doesn't have any information that data was deleted, so you somehow need to add this information to it. There are different approaches, based on the specific details:

  • Instead of just removing the row, you keep it, but add an another column that will show if data is deleted or not, something like this:
test_delta.alias("t").merge(df_latest.alias("s"),
"s.language = t.language").whenMatchedDelete(condition = "s.is_deleted = 
true").whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
  • use some other method to find diff between your destination table and input data - but this will really depends on your logic. If you able to calculate the diff, then you can use the approach that I described in the previous item.

  • If your input data is always a full set of data, you can just overwrite all data using overwrite mode - this will be even more performant than merge, because you don't rewrite the data

Upvotes: 4

Related Questions