Reputation: 141
I have the bewlow code that fails when Im attampting to do the stream stream left outer joins.
@dlt.view
def vw_ix_f_activity_gold():
return (
spark.readStream
.option("readChangeFeed", "true")
.table("lakehouse_poc.poc_streaming.activity_silver")
.alias("ACT")
# Join with Oracle Activity Data
.join(
spark.readStream.table("lakehouse_poc.poc_streaming.ix_d_activity_gold")
.withWatermark("_fivetran_synced", "5 seconds")
.alias("DAC"),
F.col("ACT.activity_seq") == F.col("DAC.BK_activity_seq"),
"left"
)
# Select and rename columns
.select(
...
)
dlt.create_streaming_table(
name = "ix_f_activity_gold",
)
dlt.apply_changes(
target = "ix_f_activity_gold",
source = "vw_ix_f_activity_gold",
keys = ["BK_activity_seq"],
sequence_by = "_fivetran_synced",
stored_as_scd_type = 1
)
not sure why this would have failed, i do have the watermarks. Do I also need to mention in apply_changes
part too?
Upvotes: 0
Views: 17