Reputation: 41
In my DLP pipeline, I have three layers - bronze, silver, and gold. The bronze layer reads JSON files from an S3 bucket, while the silver layer performs data processing tasks such as adding new columns. The gold layer is responsible for performing aggregations on the processed data.
I want to write the data from the gold layer of my DLP pipeline to a Kafka topic. However, since DLT doesn't support writeStream operations. I'm performing a readStream operation on the gold table and then trying to write the data to Kafka in a separate notebook. Since the gold table is a materialized view that is constantly being updated, my readStream code fails when I try to extract data from it. If I try to use the 'ignore changes' option to prevent this issue, my table ends up being duplicated.
What would be the most effective way to handle this?
Upvotes: 2
Views: 1252
Reputation: 14277
So, if you are changing data of table (in this case through overwrite) you can't read it as stream. There is another solution which may work, that is to use Change Data Feed (CDF). So basically you will be able to consume CDC-like events from gold delta table like you would get from CDC tools, like Debezium for example. Following steps should work:
delta.enableChangeDataFeed
to true
(
spark
.readStream
.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.table("gold_table_name")
)
_change_type
, _commit_version
and _commit_timestamp
and you may want to filter or transform this stream before writingMore documentation can be found here.
Upvotes: 2