Reputation: 3841
I am facing some weird rebalancing issue on my Kafka consumer. I have implemented an infinite retry policy for errors using SeekToCurrentErrorHandler
set the max.poll.interval.ms=1,200,000
ie 20 minutes and set retry delay as 900,000
15 minutes delay.
As you can see the poll.interval > delay. The problem I am facing is when a new consumer gets added, it rebalances and the old consumer leaves group, only new consumer receives and process the message. In the new consumer I see logs Attempt to heartbeat failed since group is rebalancing
but still it processes messages. The old consumer receives 0 data.
my consumer config below:
spring:
kafka:
......
......
consumer:
max.poll.records: 150
group.id: xxxxx
properties:
enable.auto.commit: false
max.poll.interval.ms: 1200000 #20 minutes greater than retry interval
Kafka java config:
public ConcurrentkafkaListenerContainerFactory<String, byte[]> kafkaFactory() {
ConcurrentkafkaListenerContainerFactory<String, byte[]> = new
ConcurrentkafkaListenerContainerFactory();
......
......
factory.setErrorHandler(kafkaErrorHandler);
facory.setRetryTemplate(retrtTemplate());
factory.setStatefulRetry(true)
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANIAL_IMMEDIATE);
return factory;
}
Error Handler:
@Component
public class KafkaErrorHandler extends SeekToCurrentErrorHandler {
KafkaErrorHanbdler(){super(-1)}
@Override
public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?,
?> consumer, MessageListenerContainer container) {
LOG.info("handle");
super.handle(thrownException, records, consumer, container);
}
}
My app takes around 10s to process each event and send acknowledgement for each only after successful processing. It takes 150 messages as configured. The max poll interval is set to 20 minutes. Does the kafka do poll only after 150 messages are processed?
Upvotes: 0
Views: 668
Reputation: 174729
With stateful retry, the exception is thrown to the container and we will re-seek the unprocessed records and polled again.
DEBUG logging should help you figure out what's wrong.
With newer versions (since 2.3) you can configure the SeekToCurrentErrorHandler
with a BackOff instead of a retry template.
Adding a new consumer while the current one is busy will mean the rebalance will be delayed until the first one polls again. The existing consumer won't "leave the group" until it times out.
Upvotes: 0