Eswaramoorthy P
Eswaramoorthy P

Reputation: 45

How to avoid queuing up of Batches in spark streaming

I have spark streaming with Direct Streaming and I am using below config

Batch interval 60s

spark.streaming.kafka.maxRatePerPartition 42

auto.offset.reset earliest

As I am starting the streaming batch with earliest option, to consume the messages faster from Kafka and reduce the lag, I kept spark.streaming.kafka.maxRatePerPartition as 42. So it should consume 42 x 60s x 60 partition = 151200 records per batch.

I have two questions here

  1. I see that the couple of initial batches consumed 151200 records correctly, which gradually reduced in the later batches even though there are lot of records to consume from kafka. Please see the screenshot below . what could be the reason
  2. I see batches are getting queued up a lot. How can we avoid this.

Is it possible to achieve below scenario we have the batch interval as 60s, if each batch runs within 60s, next batch can start on time. If a batch taking more time than 60s, we dont want next batch to come to queue. As soon the existing run completes, next run can start by picking the records until that time. This way we wont have lag and also we wont have queued up batches.

Spark UI - Screenshot for question 1

Upvotes: 2

Views: 2345

Answers (1)

Michael Heil
Michael Heil

Reputation: 18475

What you have observed is the expected behavior of Spark's backpressure mechanism.

You have set the configuration spark.streaming.kafka.maxRatePerPartition to 42 and as you have calculated the job will start fetching

42 * 60 partitions * 60 seconds batch interval = 151200 records per batch

Looking at the times (processing time) in your attached screenshot the job starts with that amount of records.

However, as it takes longer then 60 seconds to process all those 151200 records the backpressure mechanism will reduce the input records in subsequent batches. This is happening only after a few batches because the backpressure mechanism (aka the "PID controller") needs to wait until the very first batch has finished so it can use that experience to estimate the number of input records for the next interval. As already mentioned, processing the first 151200 took longer than just one interval which means the subsequent two intervals have already been scheduled with the maxRatePerPartition without having the experience of a finished batch interval.

That is why you see the input records descreasing only in the fourth batch. Then, the number of input records are still too high to be processed within 60 seconds, so the job builds up more and more of a delay and the PID controller (backpressure) finally realises that it is lagging to many records behind and is drastically reducing the number of input records to the minimum which is set by spark.streaming.backpressure.pid.minRate. In your case this value seem to be set to 2 which leads to 2 * 60 * 60 = 7200 records per batch interval.

To summarize, what you observe is the expected and intended behavior. The Streaming jobs needs some batches to understand and learn how much data it should fetch from Kafka to fit into the given (non-flexible) batch interval of 60 seconds. No matter how long the processing time takes within a batch, your streaming job will already plan ahead the next batch at every 60 seconds sharp.

What you could do:

  • It is recommended to set the maxRatePerPartition to about 150-200% of the actualy capacity. Just let the job run a bit longer and you will see what the estimated 100% will be.
  • When you use 60 partitions in Kafka you need to ensure the data is distributed evenly accross the partitions. Only then the maxRatePerPartition will do what you are intended to do
  • Having 60 partitions, you may use 60 cores in your Spark cluster to get the maximum consumption speed.

Upvotes: 4

Related Questions