Dan Wang
Dan Wang

Reputation: 21

Delete / overwrite rows of data based on Matched Keys in Spark

I have 2 tables within Lake Database in Synapse (Parquet). It is running in Spark. 1 table with 10 billion rows (tableA) and the other 10 million rows (tableB). I want to delete the 10 mill rows from tableA insert/overwrite with the new data from tableB.

This was my old SQL code from Stored Proc as a reference (not important), but now need in Spark code.

Delete data1 
from TableA data1
where exists (
   select 1
   from TableB data2
   where data1.Country = data2.Country
   and data1.Year = data2.Year
   and data1.Month = data2.Month
   and data1.Store_cd = data2.Store_cd
   and data1.SKU = data2.SKU
);

Spark suggest to load as df and filter, which 10 billion rows is not feasible.

tried a simple Delete -> [UNSUPPORTED_FEATURE.TABLE_OPERATION] The feature is not supported: Table spark_catalog.dbo.table1 does not support DELETE. Please check the current catalog and namespace to make sure the qualified table name is expected, and also check the catalog implementation which is configured by "spark.sql.catalog".

Upvotes: 0

Views: 107

Answers (2)

Dan Wang
Dan Wang

Reputation: 21

For those new to Delta table, it has a parquet file as base since inception. The base file will not change, but there will be delta logs written on top of the parquet. In other words, if you have a constant update on the delta log, it will build up and eventually takes considerable time to process the table.

delta_table.alias('t1').merge(source_df.alias('t2'),"t1.pk1 = t2.pk1 AND t1.pk2 = t2.pk2") .whenMatchedDelete().execute()

Upvotes: 0

Kashyap
Kashyap

Reputation: 17524

It depends on what kind of table you have (Can you share the DML?) and how it's partitioned.

If it's a Synapse table then you can use Synapse's MERGE T-SQL statement.

If using pure apache-spark syntax, you might be able to use DML commands, INSERT TABLE or LOAD, depending on how your data is partitioned and all. You might end up re-writing the whole table (if e.g. tableB has data for every partition in tableA).

E.g.

-- in an atomic operation, 1) delete rows with ssn = 123456789 and 
-- 2) insert rows from persons2 
INSERT INTO persons REPLACE WHERE ssn = 123456789 SELECT * FROM persons2

It might be easier, and a little more platform independent, to use delta table format as Jon suggested. In which case you can simply use the MERGE INTO statement. Something like:

MERGE INTO tableA
USING tableB
ON tableA.pk1 = tableB.pk1 AND tableA.pk2 = tableB.pk2
WHEN MATCHED
  THEN UPDATE SET
    pk1  = tableB.pk1,
    pk2  = tableB.pk2,
    col1 = tableB.col1,
    col2 = tableB.col2
WHEN NOT MATCHED
  THEN INSERT (
      ...
  )
  VALUES (
    tableB.pk1,
    tableB.pk2,
    tableB.col1,
    tableB.col2,
  )

Upvotes: 1

Related Questions