LucasVaz97
LucasVaz97

Reputation: 41

How to WriteStream Delta live tables to a Kafka topic

In my DLP pipeline, I have three layers - bronze, silver, and gold. The bronze layer reads JSON files from an S3 bucket, while the silver layer performs data processing tasks such as adding new columns. The gold layer is responsible for performing aggregations on the processed data.

I want to write the data from the gold layer of my DLP pipeline to a Kafka topic. However, since DLT doesn't support writeStream operations. I'm performing a readStream operation on the gold table and then trying to write the data to Kafka in a separate notebook. Since the gold table is a materialized view that is constantly being updated, my readStream code fails when I try to extract data from it. If I try to use the 'ignore changes' option to prevent this issue, my table ends up being duplicated.

What would be the most effective way to handle this?

Upvotes: 2

Views: 1252

Answers (1)

partlov
partlov

Reputation: 14277

So, if you are changing data of table (in this case through overwrite) you can't read it as stream. There is another solution which may work, that is to use Change Data Feed (CDF). So basically you will be able to consume CDC-like events from gold delta table like you would get from CDC tools, like Debezium for example. Following steps should work:

  1. Enable CDF on gold table by setting table property delta.enableChangeDataFeed to true
  2. You can consume CDF as stream with following PySpark code:
(
  spark
  .readStream
  .format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .table("gold_table_name")
)
  1. Beside columns from original table this stream will additionally have columns: _change_type, _commit_version and _commit_timestamp and you may want to filter or transform this stream before writing
  2. Write stream to Kafka in format you want

More documentation can be found here.

Upvotes: 2

Related Questions