Reputation: 91
We have a job that aggregates data over time windows. We're new to spark, and we observe significantly different performance characteristics for running the logically same query as a streaming vs a batch job. We want to understand what's going on and find possible ways to improve the speed of the structured streaming based approach.
For the sake of this post, suppose the schema is
root
|-- objectId: long (nullable = true)
|-- eventTime: long (nullable = true)
|-- date: date (nullable = true)
|-- hour: integer (nullable = true)
where
date
and hour
are (derived) partition keys, i.e. parquet files are stored in
folders like date=2020-07-26/hour=4
.objectId
is widely spread (10 million distinct values observed in an hour,
very uneven distribution)objectId
, in 5 minute bucketsWe're running a structured streaming job basically doing:
df.read.format("delta")
.withWatermark("7 minutes") // watermark only applied to streaming query
.groupBy($"date", $"hour", $"objectId", window($"eventTime", "5 minutes"))
.coalesce(1) // debatable; we like limited number of files
.partitionBy("date", "hour")
.writeStream
.format("delta")
.option("checkpointLocation", <...>)
.partitionBy("date", "hour")
.start(<destination url>)
.awaitTermination
The associated batch job basically does the same thing with the exception of
withWatermark
and comparable replacements for writeStream
etc. It reads from
exactly the same source, so it will read exactly the same files, with the same
size etc.
We are running these on:
Observations:
addBatch
)My understanding is that the streaming query, given the watermark (7 minutes) and the window size (5 minutes) only has to look back for less than 15 minutes, until it can write out a 5 minute window and discard all associated state.
Questions:
Upvotes: 3
Views: 1433
Reputation: 11
df.read.format("delta")
It looks like you are creating a static dataframe and then converting this static dataframe into the streaming one. Aggregations are applied to the static dataframe and windowing might not work for this reason. Try creating a streaming dataframe:
val DF = spark
.readStream
.format("delta")...
Some examples can be found here https://docs.databricks.com/delta/delta-streaming.html#delta-table-as-a-stream-source
Upvotes: 1