Reputation: 141
Use Case:
I am loading the Bronze layer using an external tool, which automatically creates bronze Delta tables
in Databricks. However, after the initial load, I need to manually enable changeDataFeed
for the table.
Once enabled, I proceed to run my Delta Live Table (DLT) pipeline. Currently, I’m testing this for a single table with ~5.3 million rows (307 columns, I know its alot and I narrow down it if needed)
@dlt.view
def vw_tms_activity_bronze():
return (spark.readStream
.option("readChangeFeed", "true")
.table("lakehouse_poc.yms_oracle_tms.activity")
.withColumnRenamed("_change_type", "_change_type_bronze")
.withColumnRenamed("_commit_version", "_commit_version_bronze")
.withColumnRenamed("_commit_timestamp", "_commit_timestamp_bronze"))
dlt.create_streaming_table(
name = "tg_tms_activity_silver",
spark_conf = {"pipelines.trigger.interval" : "2 seconds"}
)
dlt.apply_changes(
target = "tg_tms_activity_silver",
source = "vw_tms_activity_bronze",
keys = ["activity_seq"],
sequence_by = "_fivetran_synced",
stored_as_scd_type = 1
)
Issue:
When I execute the pipeline, it successfully picks up the data from Bronze and loads it into Silver. However, I am not satisfied with the latency in moving data from Bronze to Silver.
I have attached an image showing:
_fivetran_synced
(UTC TIMESTAMP) indicates the time when Fivetran last successfully extracted the row.
_commit_timestamp_bronze
The timestamp associated when the commit was created in bronze
_commit_timestamp_silver
The timestamp associated when the commit was created in silver.
Results show that its 2 min latency between bronze and silver. By default pipeline trigger interval is 1 min for complete queries when all input data is from Delta sources. Therefore, I defined manually spark_conf = {"pipelines.trigger.interval" : "2 seconds"}
but not sure if really works or no.
Upvotes: 0
Views: 46