Reputation: 45
I have spark streaming with Direct Streaming and I am using below config
Batch interval 60s
spark.streaming.kafka.maxRatePerPartition 42
auto.offset.reset earliest
As I am starting the streaming batch with earliest option, to consume the messages faster from Kafka and reduce the lag, I kept spark.streaming.kafka.maxRatePerPartition as 42. So it should consume 42 x 60s x 60 partition = 151200 records per batch.
I have two questions here
Is it possible to achieve below scenario we have the batch interval as 60s, if each batch runs within 60s, next batch can start on time. If a batch taking more time than 60s, we dont want next batch to come to queue. As soon the existing run completes, next run can start by picking the records until that time. This way we wont have lag and also we wont have queued up batches.
Spark UI - Screenshot for question 1
Upvotes: 2
Views: 2345
Reputation: 18475
What you have observed is the expected behavior of Spark's backpressure mechanism.
You have set the configuration spark.streaming.kafka.maxRatePerPartition
to 42 and as you have calculated the job will start fetching
42 * 60 partitions * 60 seconds batch interval = 151200 records per batch
Looking at the times (processing time) in your attached screenshot the job starts with that amount of records.
However, as it takes longer then 60 seconds to process all those 151200 records the backpressure mechanism will reduce the input records in subsequent batches. This is happening only after a few batches because the backpressure mechanism (aka the "PID controller") needs to wait until the very first batch has finished so it can use that experience to estimate the number of input records for the next interval. As already mentioned, processing the first 151200 took longer than just one interval which means the subsequent two intervals have already been scheduled with the maxRatePerPartition without having the experience of a finished batch interval.
That is why you see the input records descreasing only in the fourth batch. Then, the number of input records are still too high to be processed within 60 seconds, so the job builds up more and more of a delay and the PID controller (backpressure) finally realises that it is lagging to many records behind and is drastically reducing the number of input records to the minimum which is set by spark.streaming.backpressure.pid.minRate
. In your case this value seem to be set to 2 which leads to 2 * 60 * 60 = 7200 records per batch interval.
To summarize, what you observe is the expected and intended behavior. The Streaming jobs needs some batches to understand and learn how much data it should fetch from Kafka to fit into the given (non-flexible) batch interval of 60 seconds. No matter how long the processing time takes within a batch, your streaming job will already plan ahead the next batch at every 60 seconds sharp.
What you could do:
maxRatePerPartition
to about 150-200% of the actualy capacity. Just let the job run a bit longer and you will see what the estimated 100% will be.Upvotes: 4