Reputation: 159
I am seeing the continuous rebalancing on my application. My application is developed in batch mode and here are configuration properties which have been added.
myapp.consumer.group.id= cg-id-local
myapp.changefeed.topic= test_topic
myapp.auto.offset.reset=latest
myapp.enable.auto.commit=false
myapp.max.poll.interval.ms=300000
myapp.max.poll.records= 20000
myapp.idle.time.between.polls=240000
myapp.concurrency = 10
container factory:
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory(poSummaryCGID));
factory.setConcurrency(poSummNoOfConsumers);
factory.setBatchListener(true);
factory.setAckDiscarded(true);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties().setIdleBetweenPolls(idleTimeBetweenPolls);
I have few Questions here:
I have setup the maximum record count per poll(4 min) is 20000 and we have 10 partitions in a TOPIC. Since i setup the concurrency as 10, so 10 consumers will up and running and each will listen to 1 partition. My question here is, does the record count will be split across all the consumers like each consumer can handle 2000 records ?
The max.poll.interval.ms has been setup with 5 min. I am sure that the consumer will process 2000(if my above understanding is correct) records in a given poll interval(4 min) which is less than max.poll.interval.ms which has upper bound limit. But not sure why rebalancing is happening? are there any other configuration properties i need to setup ?
Help would be greatly appreciated!!
Tried with these configurations:
myapp.max.poll.interval.ms=600000
myapp.max.poll.records= 2000
myapp.idle.time.between.polls=360000
myapp.max.poll.interval.ms=300000
myapp.max.poll.records= 2000
myapp.idle.time.between.polls=300000
myapp.max.poll.interval.ms=300000
myapp.max.poll.records= 2000
myapp.idle.time.between.polls=180000
EDIT FIX : We should always myapp.max.poll.interval.ms > (myapp.idle.time.between.polls + myapp.max.poll.records processing time).
Upvotes: 0
Views: 3831
Reputation: 174554
No. max.poll.records
is per consumer, not per topic or container.
If you have concurrency=10 and 10 partitions you should reduce max.poll.records
to 2000 so that each consumer gets a max of 2000 per poll.
The container will automatically reduce the idle between polls so that the max.poll.interval.ms
won't be exceeded, but you should be conservative with these properties (max.poll.records
and max.poll.interval.ms
) such that it will never be possible to exceed the interval.
Upvotes: 2