Jesper Martinsson
Jesper Martinsson

Reputation: 41

Handling Incremental Data Loading and SCD Type 2 for joined tables in Delta Live Tables on Databricks

I'm working on a project utilizing Delta Live Tables on Databricks, where I need to create a dimension (Kimball style) with slowly changing dimension type 2. The dimension is the result of a join between several bronze tables incrementally loaded using autoloader. The resulting silver table needs to be streaming or append-only since I need to use it as a source for a streaming table loaded using apply changes with scd type 2. However, I'm facing challenges due to the streaming behavior.

Here's the breakdown of the scenario:

Bronze Layer (bronze_raw): Data is loaded incrementally using autoloader once a day.

Silver Layer: Business logic is applied here to create the dimension, and I need the resulting tables to be either streaming or append-only.

SCD Type 2 Handling (silver_full_hist): The silver tables serve as a source for a streaming table where we apply changes to implement SCD Type 2.

The issue arises during the join operation in the silver layer. Since streaming handles only new rows, joining with existing records can lead to missing data. For instance, if a new customer is added in a CRM system and we need to join the account table with another to retrieve the customer representative, which hasn't changed, an inner join would result in no rows due to the absence of existing records. A left join would on the other hand end up with ha null value for the rep column even though there exist one.

I'm seeking guidance on how to achieve a seamless flow in this scenario. Any insights or best practices for implementing this workflow in Delta Live Tables would be highly appreciated.

I noticed another question on the topic: Joining Tables for Databricks Delta Live Tables SCD Type 2

It got an answer saying that scd type 2 needs to be applied to each of the bronze tables before joining them. However, then you would need to decide from which of the bronze tables you need to pick the __START_AT and __END_AT columns for the resulting dimension with scd type 2. It might not be obvious in all cases and may force you to infer very complicated logic.

Below is what I have experimented with so far trying to create a streaming silver table to be used as source for apply changes:

silver_data_load_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

#Ingest files using autoloader. Customer representative are found in a table called 'user'
@dlt.table (
    name = "bronze_account_raw"
  )
def collect_raw_bronze():
  return (
      spark.readStream
          .format("cloudFiles")
          .options(**csv_file_options)
          .load(directory_account)

@dlt.table (
    name = "bronze_user_raw"
  )
def collect_raw_bronze():
  return (
      spark.readStream
          .format("cloudFiles")
          .options(**csv_file_options)
          .load(directory_user)       

#Get the latest state of the sources
dlt.create_streaming_table(name="bronze_account_latest")

dlt.apply_changes(
  target = "bronze_account_latest",
  source = "bronze_account_raw",
  keys = [account_id],
  sequence_by ="commit_timestamp",
  apply_as_deletes =F.expr(f"operation = 'delete'"),
  except_column_list=['operation'],
  stored_as_scd_type = 1
)

dlt.create_streaming_table(name="bronze_user_latest")

dlt.apply_changes(
  target = "bronze_user_latest",
  source = "bronze_user_raw",
  keys = [user_id],
  sequence_by ="commit_timestamp",
  apply_as_deletes =F.expr(f"operation = 'delete'"),
  except_column_list=['operation'],
  stored_as_scd_type = 1
)

#Get dimension by joining bronze tables. Created as temporary table here since I initially it forgets about the state and reads all rows in the bronze tables and does not raise the exception about deletes and updates (not the case, it raised the error).

transform_query = f"""
SELECT a.customer_name
       ,u.rep_name
FROM STREAM(LIVE.bronze_account_latest) as a
INNER JOIN STREAM(LIVE.bronze_user_latest) as u on a.rep_id= u.user_id
"""

@dlt.table(
        name="silver_customer",
        temporary=True
    )
def load_silver():
    df = spark.sql(transform_query)
    columns = df.columns
    return (
        df.withColumn("silver_commit_timestamp",F.lit(silver_data_load_timestamp))     
        .select(*columns, "silver_commit_timestamp")
    )

#Here I expect a scd type 2 table based on the silver table. I can then choose to (1) get the latest state of the dimension by setting END_AT IS NULL or (2) get the state for a particular point in time.

dlt.create_streaming_table(name="silver_customer_scd",table_properties={"quality": "silver"},)

dlt.apply_changes(
  target = "silver_customer_scd",
  source = "silver_customer",
  keys = ["customer_name"],
  sequence_by ="silver_commit_timestamp",
  stored_as_scd_type = 2
)

Upvotes: 4

Views: 855

Answers (1)

rupaj
rupaj

Reputation: 66

Streaming tables inherit the processing guarantees of Apache Spark Structured Streaming and are configured to process queries from append-only data sources, where new rows are always inserted into the source table rather than modified.

Once any record is modified in your 'bronze_account_latest', 'bronze_user_latest', these tables cannot be used as streaming source for silver_customer_scd unless you ok to do a 'Full Refresh' on 'silver_customer_scd'or 'skipChangeCommits' as mentioned here https://docs.databricks.com/en/structured-streaming/delta-lake.html#ignore-changes

Alternate is to make 'silver_customer_scd' a materialized view, DLT takes care of refreshing it when underlying tables are updated https://docs.databricks.com/en/views/materialized-views-how-it-works.html

Upvotes: 0

Related Questions