Lionel Bisschoff
Lionel Bisschoff

Reputation: 41

How does Structured Streaming plan logical plan of streaming query for every micro-batch?

I set up a small test on my laptop that does the following:

I created a Kafka topic with a few 1000 messages where each message contains a few rows with each row having about 100 columns.

Create 300 quite complex Spark columns in a List[Column]. No aggregations.

On setting up the stream from Kafka I set .option("maxOffsetsPerTrigger", 1) so only one message is processed in each mini-batch.

I then apply the columns to the mini-batches consisting of just one message.

val testStream = myStream
  .select(manyTestCols :_*)
  .writeStream
  .format("console")
  .outputMode("append")
  .start()
  .awaitTermination()

Spark takes about 10 seconds to process each message.

I then change maxOffsetsPerTrigger to .option("maxOffsetsPerTrigger", 1000) so 1000 messages are processed in each mini-batch.

Spark takes about 11 seconds to process all 1000 messages in each mini-batch.

So, it seems that Spark does some kind of "setup work" and then processes each mini-batch quite fast once it gets going.

Is this "setup work" going through the query planning through to the physical plan, for each mini-batch?

If so, does it make sense for Spark to do this each mini-batch?

Or is something else entirely going on? Am looking at Spark source code, but would appreciate feedback from someone that has gone through this exercise already.

Tx for any insights.

Upvotes: 4

Views: 445

Answers (1)

Jacek Laskowski
Jacek Laskowski

Reputation: 74619

Is this "setup work" going through the query planning through to the physical plan, for each mini-batch?

Partially yes for the execution-specific parts of a query plan of a streaming query to be filled out at runtime as follows (with links to respective code parts):

  1. Proper relations for data sources (e.g. LocalRelation for no-data sources)
  2. Event-time watermark
  3. Current (micro-batch) time

If so, does it make sense for Spark to do this each mini-batch?

Absolutely. There's no other way in Structured Streaming to short-circuit no-data sources, track the current time and watermark.

That's also the reason for an extra no-data micro-batch for stateful operators when, say, watermark changes.

Am looking at Spark source code, but would appreciate feedback from someone that has gone through this exercise already.

See MicroBatchExecution and IncrementalExecution.

Upvotes: 2

Related Questions