dispanser
dispanser

Reputation: 91

Structured Streaming vs Batch Performance differences

We have a job that aggregates data over time windows. We're new to spark, and we observe significantly different performance characteristics for running the logically same query as a streaming vs a batch job. We want to understand what's going on and find possible ways to improve the speed of the structured streaming based approach.

For the sake of this post, suppose the schema is

root
 |-- objectId: long (nullable = true)
 |-- eventTime: long (nullable = true)
 |-- date: date (nullable = true)
 |-- hour: integer (nullable = true)

where

We're running a structured streaming job basically doing:

df.read.format("delta")
  .withWatermark("7 minutes") // watermark only applied to streaming query
  .groupBy($"date", $"hour", $"objectId", window($"eventTime", "5 minutes"))
  .coalesce(1) // debatable; we like limited number of files
  .partitionBy("date", "hour")
  .writeStream
  .format("delta")
  .option("checkpointLocation", <...>)
  .partitionBy("date", "hour")
  .start(<destination url>)
  .awaitTermination

The associated batch job basically does the same thing with the exception of withWatermark and comparable replacements for writeStream etc. It reads from exactly the same source, so it will read exactly the same files, with the same size etc.

We are running these on:

Observations:

My understanding is that the streaming query, given the watermark (7 minutes) and the window size (5 minutes) only has to look back for less than 15 minutes, until it can write out a 5 minute window and discard all associated state.

Questions:

Upvotes: 3

Views: 1433

Answers (1)

Vasily Lebedev
Vasily Lebedev

Reputation: 11

df.read.format("delta")

It looks like you are creating a static dataframe and then converting this static dataframe into the streaming one. Aggregations are applied to the static dataframe and windowing might not work for this reason. Try creating a streaming dataframe:

  val DF = spark
  .readStream
  .format("delta")...

Some examples can be found here https://docs.databricks.com/delta/delta-streaming.html#delta-table-as-a-stream-source

Upvotes: 1

Related Questions