Reputation: 1
I have a Pyspark Structured Streaming Application (3.3.2) which needs to read input from Kafka using micro batches, perform complex logic which includes joining data from few data frames. The app is divided into 2 streaming queries:
Can someone explain why there are 2 SQLs running? and why is the first one takes 20+ seconds?
My intuition was that since the plan is quite large, it takes time to handle the plan (tho I expected the plan to be "compiled" once and not re-evaluated each micro batch).
I tried the following:
spark.sql.cbo.enabled
feature flagFew things to mention:
Upvotes: 0
Views: 129
Reputation: 166
What's likely going on here is that you are running a stateful query with a watermark; that is, using a watermark with either deduplication, aggregation, a join, or (Flat)MapGroupsWithState.
If that is the type of query you have, here's why it's happening: two batches occur due to the way that Structured Streaming (SS) implements statful operators (with respect to watermarks). After all data is processed in the batch, Structured Streaming does two things, in this order:
.withWatermark
call.The issue is that Structured Streaming might have records in state that cannot be removed by the current watermark, but can be removed by the updated watermark. As a result, it runs another "no data" batch, just to apply the watermark from step 2 to the records in state. This actually is a config, which is nicely documented in the source code.
Let's consider a 5 minute tumbling aggregation (i.e. windows are 0-5, 5-10, etc.) with a 10 minute watermark. Suppose we're just counting the number of elements in each window.
In our first batch, our watermark starts at 0; let's say we receive two records at time 12 and 13. After processing them, our internal state looks like:
[10, 15] -> 2 records
Our current watermark is 0, which is not greater than 15, so that record isn't emitted downstream. Then, the watermark updates to 13 - 10, which is 3. The no-data batch runs, but since 3 is still less than 15, nothing is emitted.
So the no-data batch didn't quite help there, but let's consider what happens when we receive a few more records: 14, 26, 28. After processing them, our state looks like:
[10, 15] -> 3 records
[25, 30] -> 2 records
Our current watermark is still 3, which is not greater than 15, so nothing is emitted. Then, the watermark is updated to be 28 - 10 = 18. Structured Streaming then runs its no-data batch. Since 18 is greater than the end timestamp of 15, the [10, 15] -> 3 records
record is emitted downstream. Then, our state looks like:
[25, 30] -> 2 records
So, the no-data batch is helpful in that it emits results as soon as possible (within the watermark architecture of Structured Streaming). Note that this behavior applies for any stateful operators, not just aggregations.
Upvotes: 0