BorisG
BorisG

Reputation: 1

PySpark Structured Streaming 2 SQLs Per Batch (Long addBatch Execution)

I have a Pyspark Structured Streaming Application (3.3.2) which needs to read input from Kafka using micro batches, perform complex logic which includes joining data from few data frames. The app is divided into 2 streaming queries:

  1. Load the dataframes that are needed for the calculation - this is semi static data (changes few times a day) - the data is cached for performance reasons to be shared with streaming query 2
  2. Perform the logic itself - using foreachBatch, performing the logic. Note: the plan is quite big. The problem is that each micro batch of streaming query 2, I see 2 SQLs running, with the same batch_id and run_id - the first takes 20+ seconds which I don't really understand what is it's purpose, the second runs pretty fast which looks like it is the actual query execution.

Can someone explain why there are 2 SQLs running? and why is the first one takes 20+ seconds?

My intuition was that since the plan is quite large, it takes time to handle the plan (tho I expected the plan to be "compiled" once and not re-evaluated each micro batch).

I tried the following:

Few things to mention:

See screenshot

Upvotes: 0

Views: 129

Answers (1)

Neil Ramaswamy
Neil Ramaswamy

Reputation: 166

What's likely going on here is that you are running a stateful query with a watermark; that is, using a watermark with either deduplication, aggregation, a join, or (Flat)MapGroupsWithState.

If that is the type of query you have, here's why it's happening: two batches occur due to the way that Structured Streaming (SS) implements statful operators (with respect to watermarks). After all data is processed in the batch, Structured Streaming does two things, in this order:

  1. It uses the current watermark to determine what elements can be removed from state.
  2. It then updates the watermark, to be used in the next batch, by taking the largest event-time of the current batch and subtracting the delay given in your .withWatermark call.

The issue is that Structured Streaming might have records in state that cannot be removed by the current watermark, but can be removed by the updated watermark. As a result, it runs another "no data" batch, just to apply the watermark from step 2 to the records in state. This actually is a config, which is nicely documented in the source code.

An Example

Let's consider a 5 minute tumbling aggregation (i.e. windows are 0-5, 5-10, etc.) with a 10 minute watermark. Suppose we're just counting the number of elements in each window.

In our first batch, our watermark starts at 0; let's say we receive two records at time 12 and 13. After processing them, our internal state looks like:

  • [10, 15] -> 2 records

Our current watermark is 0, which is not greater than 15, so that record isn't emitted downstream. Then, the watermark updates to 13 - 10, which is 3. The no-data batch runs, but since 3 is still less than 15, nothing is emitted.

So the no-data batch didn't quite help there, but let's consider what happens when we receive a few more records: 14, 26, 28. After processing them, our state looks like:

  • [10, 15] -> 3 records
  • [25, 30] -> 2 records

Our current watermark is still 3, which is not greater than 15, so nothing is emitted. Then, the watermark is updated to be 28 - 10 = 18. Structured Streaming then runs its no-data batch. Since 18 is greater than the end timestamp of 15, the [10, 15] -> 3 records record is emitted downstream. Then, our state looks like:

  • [25, 30] -> 2 records

So, the no-data batch is helpful in that it emits results as soon as possible (within the watermark architecture of Structured Streaming). Note that this behavior applies for any stateful operators, not just aggregations.

Upvotes: 0

Related Questions