MR_K
MR_K

Reputation: 159

Rebalance issue with spring kafka max.poll.interval.ms, max.poll.records and idleTimeBetweenPolls

I am seeing the continuous rebalancing on my application. My application is developed in batch mode and here are configuration properties which have been added.

myapp.consumer.group.id= cg-id-local
myapp.changefeed.topic= test_topic
myapp.auto.offset.reset=latest
myapp.enable.auto.commit=false
myapp.max.poll.interval.ms=300000
myapp.max.poll.records= 20000
myapp.idle.time.between.polls=240000
myapp.concurrency = 10

container factory:

ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(consumerFactory(poSummaryCGID));
        factory.setConcurrency(poSummNoOfConsumers);
        factory.setBatchListener(true);
        factory.setAckDiscarded(true);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        factory.getContainerProperties().setIdleBetweenPolls(idleTimeBetweenPolls);

I have few Questions here:

  1. I have setup the maximum record count per poll(4 min) is 20000 and we have 10 partitions in a TOPIC. Since i setup the concurrency as 10, so 10 consumers will up and running and each will listen to 1 partition. My question here is, does the record count will be split across all the consumers like each consumer can handle 2000 records ?

  2. The max.poll.interval.ms has been setup with 5 min. I am sure that the consumer will process 2000(if my above understanding is correct) records in a given poll interval(4 min) which is less than max.poll.interval.ms which has upper bound limit. But not sure why rebalancing is happening? are there any other configuration properties i need to setup ?

Help would be greatly appreciated!!

Tried with these configurations: 

myapp.max.poll.interval.ms=600000 
myapp.max.poll.records= 2000 
myapp.idle.time.between.polls=360000 

myapp.max.poll.interval.ms=300000 
myapp.max.poll.records= 2000 
myapp.idle.time.between.polls=300000 

myapp.max.poll.interval.ms=300000 
myapp.max.poll.records= 2000 
myapp.idle.time.between.polls=180000

EDIT FIX : We should always myapp.max.poll.interval.ms > (myapp.idle.time.between.polls + myapp.max.poll.records processing time).

Upvotes: 0

Views: 3831

Answers (1)

Gary Russell
Gary Russell

Reputation: 174554

No. max.poll.records is per consumer, not per topic or container.

If you have concurrency=10 and 10 partitions you should reduce max.poll.records to 2000 so that each consumer gets a max of 2000 per poll.

The container will automatically reduce the idle between polls so that the max.poll.interval.ms won't be exceeded, but you should be conservative with these properties (max.poll.records and max.poll.interval.ms) such that it will never be possible to exceed the interval.

Upvotes: 2

Related Questions