Reputation: 117
I have a Delta Table which I am reading as StreamingQuery.
Looking through the Delta Table History, using DESCRIBE History
, I am seeing that 99% of the OperationMetrics states that numTargetRowsUpdates is 0
with most operations being Inserts. However, there are like 2-3 occasionally which have numTargetRowsUpdates > 1. The Operation on the Delta Table However is a Merge.
Can I still use StreamingQuery and read this data as a stream or would I get errors?. i.e.:
df: DataFrame = spark \
.readStream \
.format("delta") \
.load(f"{table_location}") \
df.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", f "{checkpoint}/{table_location}")\
.trigger(once=True) \
.foreachBatch(process_batch) \
.start()
Now I have another Delta Table which is more of a dimension table of Customer information, i.e. email, name, last seen, etc.
I was initially reading this as a StreamingQuery as append, but I am getting the following error: java.lang.UnsupportedOperationException: Detected a data update
Looking through this table, within the Describe History, I see there are a number of Updates happening. Question: If I use StreamQuery with IgnoreChanges, True
, will this send the updated records as new records, which I can process further in the foreachBatch?
Upvotes: 3
Views: 3318
Reputation: 3676
If there are updates or deletes in your delta source the read stream will throw an exception. This is also clear from databricks documentation:
Structured Streaming does not handle input that is not an append and throws an exception if any modifications occur on the table being used as a source.
If you use IgnoreChanges, True
it will not throw an exception but it will give you the updated rows + rows which could have already been processed. This is because everything in the delta table happens on file level. For example, if you update a single row in a file (roughly) the following will happen:
This is also mentioned in the docs.
ignoreChanges: re-process updates if files had to be rewritten in the source table due to a data changing operation such as UPDATE, MERGE INTO, DELETE (within partitions), or OVERWRITE. Unchanged rows may still be emitted, therefore your downstream consumers should be able to handle duplicates. ...
You'll have to decide if this is ok for your use case. If you need to specifically handle updates and deletes databricks offers Change Data Feed, which you can enable on delta tables. This gives you row level details about inserts, appends and deletes (at the cost of some extra storage and IO).
Upvotes: 2