PiyushC
PiyushC

Reputation: 308

Periodic processing time spikes in spark structured streaming

I am wondering why every 4th batch of my spark streaming application has a huge spike.

Some details

  1. This is sateful processing using rocksdb state store
  2. Reading from Kafka with 180 partitions
  3. Writing to Kafka with 12 partitions
  4. Set 300 shuffle partitions
  5. Using spark 3.3.3 with python

Spark UI screenshot

I think it might be related to checkpointing to s3. But couldn't find a setting to change how often to checkpoint to external storage.

I have tried adjusting the cluster size, but that didn't seem to help. Didn't try anything methodical though.

Upvotes: 0

Views: 535

Answers (1)

Jonathan
Jonathan

Reputation: 2068

Assuming you use the default trigger and max offsets of trigger (i.e. trigger = unspecified (default) and maxOffsetsPerTrigger = None).

Your statistics show that spikes are not caused by the resource insufficiency or data volume spikes, as there is no saturation in process rate, no linear growth in both input rows and batch duration, stable input rate.

As your spikes are mostly caused by addBatch operation, if you don't have any special data and trigger special transformation in every 4th batch, also every writing are the same, I suspect it's caused by the metadata management. For example, reading a huge compacted metadata log to find the latest batch id (this issue was fixed).

You can check if your RocksDB has any setting on metadata, also check the SQL queries history to see if the reading or writing part trigger the long metadata operations.


Edit 1 on 2024-01-18

I think it's not related to what kind of storage you use to store your state if you're talking about the spike. It's more like an issue on how to minimize the batch duration caused by the metadata management and comparing different storage.

To answer your question, you should figure out what kind of scenario cause your spike and you might refer to below posts:

  • Lots of small metadata file: post
  • Large compacted metadata handling: post, post and post

Just to remind you that some of the configuration are internal (for example spark.sql.streaming.minBatchesToRetain mail), you might not find it on the spark configuration official website. Before that, you could try to increase the executor memory (not adding extra executor) and it might help.

Upvotes: 1

Related Questions