Yuva
Yuva

Reputation: 3163

Databricks DLT reading a table from one schema(bronze), process CDC data and store to another schema (processed)

I am developing an ETL pipeline using databricks DLT pipelines for CDC data that I recieve from kafka. I have created 2 pipelines successfully for landing, and raw zone. The raw one will have operation flag, a sequence column, and I would like to process the CDC and store the clean data in processed layer (SCD 1 type). I am having difficulties in reading table from one schema, apply CDC changes, and load to target db schema tables.

I have 100 plus tables, so i am planning to loop through the tables in RAW layer and apply CDC, move to processed layer. Following is my code that I have tried (I have left the commented code just for your reference).

import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import *

raw_db_name = "raw_db"
processed_db_name = "processed_db_name"

def generate_curated_table(src_table_name, tgt_table_name, df):
    
#     @dlt.view(
#         name= src_table_name,
#         spark_conf={
#            "pipelines.incompatibleViewCheck.enabled": "false"
#           },
#         comment="Processed data for " + str(src_table_name)
#       )

# #     def create_target_table():
# #         return (df)
    
#     dlt.create_target_table(name=tgt_table_name,
#       comment= f"Clean, merged {tgt_table_name}",
#       #partition_cols=["topic"],
#       table_properties={
#         "quality": "silver"
#       }
#       )

#     @dlt.view
#     def users():
#         return spark.readStream.format("delta").table(src_table_name)

    @dlt.view
    def raw_tbl_data():
        return df


    dlt.create_target_table(name=tgt_table_name,
      comment="Clean, merged customers",
      table_properties={
        "quality": "silver"
      })

    dlt.apply_changes(
        target = tgt_table_name,
        source = f"{raw_db_name}.raw_tbl_data,
        keys = ["id"],
        sequence_by = col("timestamp_ms"),
        apply_as_deletes = expr("op = 'DELETE'"),
        apply_as_truncates = expr("op = 'TRUNCATE'"),
        except_column_list = ["id", "timestamp_ms"],
        stored_as_scd_type = 1
       )
    return
    
tbl_name = 'raw_po_details'

df = spark.sql(f'select * from {raw_dbname}.{tbl_name}')
processed_tbl_name = tbl_name.replace("raw", "processed")   //processed_po_details
generate_curated_table(tbl_name, processed_tbl_name, df)

I have tried with dlt.view(), dlt.table(), dlt.create_streaming_live_table(), dlt.create_target_table(), but ending up with either of the following errors:

AttributeError: 'function' object has no attribute '_get_object_id'

pyspark.sql.utils.AnalysisException: Failed to read dataset '<raw_db_name.mytable>'. Dataset is not defined in the pipeline

.Expected result:

  1. Read the dataframe which is passed as a parameter (RAW_DB) and
  2. Create new tables in PROCESSED_DB which is configured in DLT pipeline settings

https://www.databricks.com/blog/2022/04/27/how-uplift-built-cdc-and-multiplexing-data-pipelines-with-databricks-delta-live-tables.html

https://cprosenjit.medium.com/databricks-delta-live-tables-job-workflows-orchestration-patterns-bc7643935299

Appreciate any help please.

Thanks in advance

Upvotes: 1

Views: 2725

Answers (1)

Yuva
Yuva

Reputation: 3163

I got the solution myself and got it working, thanks to all. Am adding my solution so it could be a reference to others.

import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import *

def generate_silver_tables(target_table, source_table):

 @dlt.table
 def customers_filteredB():
        return spark.table("my_raw_db.myraw_table_name")

  ### Create the target table definition
 dlt.create_target_table(name=target_table,
 comment= f"Clean, merged {target_table}",
 #partition_cols=["topic"],
 table_properties={
   "quality": "silver",
   "pipelines.autoOptimize.managed": "true"
 }
 )
  
 ## Do the merge
 dlt.apply_changes(
   target = target_table,
   source = "customers_filteredB",
   keys = ["id"],
   apply_as_deletes = expr("operation = 'DELETE'"),
   sequence_by = col("timestamp_ms"),#primary key, auto-incrementing ID of any kind that can be used to identity order of events, or timestamp
   ignore_null_updates = False,
   except_column_list = ["operation", "timestamp_ms"],
   stored_as_scd_type = "1"
 )
 return

raw_dbname = "raw_db"
raw_tbl_name = 'raw_table_name'
processed_tbl_name = raw_tbl_name.replace("raw", "processed")
generate_silver_tables(processed_tbl_name, raw_tbl_name)

Upvotes: 2

Related Questions