Tilak
Tilak

Reputation: 363

Spring kafka MessageListener and max.poll.records

I am using spring kafka 2.7.8 to consume messages from kafka. Consumer listener is as below

@KafkaListener(topics = "topicName",
            groupId = "groupId",
            containerFactory = "kafkaListenerFactory")
    public void onMessage(ConsumerRecord record) {

   }

Above onMessage method receives single message at a time.

Does this mean max.poll.records is set to 1 by spring library or it polls 500 at a time(default value) and the method receives one by one.

Reason for this question is, we often see below errors together in prod. Received all 4 errors below for multiple consumers in under a minute. Trying to understand whether it is due to intermittent kafka broker connectivity issue or due to load. Please advise.

Seek to current after exception; nested exception is org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.

seek to current after exception; nested exception is org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before successfully committing offsets {topic-9=OffsetAndMetadata{offset=2729058, leaderEpoch=null, metadata=''}}

Consumer clientId=consumer-groupName-5, groupId=consumer] Offset commit failed on partition topic-33 at offset 2729191: The coordinator is not aware of this member.

Seek to current after exception; nested exception is org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records

Upvotes: 3

Views: 13358

Answers (1)

Gary Russell
Gary Russell

Reputation: 174484

max.poll.records is not changed by Spring; it will take the default (or whatever you set it to). The records are handed to the listener one at a time before the next poll.

This means that your listener must be able to process max.poll.records within max.poll.interval.ms.

You need to reduce max.poll.records and/or increase max.poll.interval.ms so that you can process the records in that time, with a generous margin, to avoid these rebalances.

Upvotes: 9

Related Questions