RAH
RAH

Reputation: 476

Databricks DLT and CDC When Underlying Data Changed

I have a DLT where underlying data is stored in parquet format in S3. This data may be updated and appended to. Putting aside SCD and just looking at CDC, I am attempting to find the SQL syntax that will allow bronze and subsequent silver child tables to bring in appended data from source and update existing rows should they find a change.

Databricks own documentation on this situation https://docs.databricks.com/en/delta-live-tables/cdc.html#language-sql says to define as a source and then apply changes, using the following syntax:

CREATE OR REFRESH STREAMING TABLE table_name;

APPLY CHANGES INTO LIVE.table_name
FROM sourceKEYS (keys)
[IGNORE NULL UPDATES]
[APPLY AS DELETE WHEN condition]
[APPLY AS TRUNCATE WHEN condition]
SEQUENCE BY orderByColumn
[COLUMNS {columnList | * EXCEPT (exceptColumnList)}]
[STORED AS {SCD TYPE 1 | SCD TYPE 2}][TRACK HISTORY ON {columnList | * EXCEPT (exceptColumnList)}]

However when applying this and manually inserting changes into the source data, the following error is presented that clearly contradicts Databricks documentation:

Flow 'silv_test' has FAILED fatally. An error occurred because we detected an update or delete to one or more rows in the source table. Streaming tables may only use append-only streaming sources. If you expect to delete or update rows to the source table in the future, please convert table silv_test to a live table instead of a streaming live table. To resolve this issue, perform a Full Refresh to table silv_test. A Full Refresh will attempt to clear all data from table silv_test and then load all data from the streaming source. The non-append change can be found at version 3

If APPLY CHANGES does not resolve this then how should DLT be constructed in order to overcome this?

Upvotes: 0

Views: 1418

Answers (1)

Anupam Chand
Anupam Chand

Reputation: 2687

First we need to keep in mind that DLT assumes that the data input tables are streams. Streams means that the tables are append only. Even APPLY CHANGES works only if the input table is a streaming/append only table. i.e. it is expecting the changed records to be appended to the input table. You can then use DLT to update the existing records in the downstream tables using these appended records.

Any update or delete from the history records of the source table will result in the error you provided above. You can read more about this HERE.

For your scenario, if there is no way for the source to append the changed records instead of updating them, then the only option is to apply the changes to each layer (Bronze, Silver Gold). You don't need to do this in DLT, you can have a separate pyspark notebook. This applies for DELETES as well (Example deleting only data from the bronze layer after a specific retention period). Your pipeline will still fail if you do not specify the skipchangecommits flag to True. This tells DLT to ignore any changes to the earlier history in case of a streaming table. You can use the python language reference or for Databricks SQL you can try THIS command while creating/refreshing the table..

Upvotes: 1

Related Questions