trippl
trippl

Reputation: 31

Kafka consumer not automatically reconnecting after outage

In our infrastructure we are running Kafka with 3 nodes and have several spring boot services running in OpenShift. Some of the communication between the services happens via Kafka. For the consumers/listeners we are using the @KafkaListener spring annotation with a unique group ID so that each instance (pod) consumes all the partitions of a topic

@KafkaListener(topics = "myTopic", groupId = "group#{T(java.util.UUID).randomUUID().toString()}")
public void handleMessage(String message) {
    doStuffWithMessage(message);
}

For the configuration we are using pretty much the default values. For the consumers all we got is

spring.kafka.consumer:
  auto-offset-reset: latest
  value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  key-deserializer: org.apache.kafka.common.serialization.StringDeserializer

Sometimes we face the unfortunate situation, where all of our Kafka nodes are shortly down, which will result in the consumers unregistering, as logged by org.apache.kafka.common.utils.AppInfoParser

App info kafka.consumer for consumer-group5c327050-5b05-46fb-a7be-c8d8a20d293a-1 unregistered

Once the nodes are up again, we would expect that the consumers register again, however that is not the case. So far we have no idea why they fail to do so. For now we are forced to restart the affected pods, when this issue occurs. Did anybody have a similar issue before or has an idea what we might be doing wrong?

Edit: We are using the following versions

Upvotes: 2

Views: 16838

Answers (2)

trippl
trippl

Reputation: 31

We did some more digging in our logs and found the underlying issue that causes the consumer(s) to be stopped.

Authentcation/Authorization Exception and no authExceptionRetryInterval set

So apparently the consumer is getting an Authentcation/Authorization Exception when trying to reconnect to the currently unavailable Kafka nodes and since we did not set authExceptionRetryInterval there won't be any retries and the consumer (listener container) is stopped. https://docs.spring.io/spring-kafka/api/org/springframework/kafka/listener/ConsumerProperties.html#setAuthExceptionRetryInterval(java.time.Duration)

Set the interval between retries after and AuthenticationException or org.apache.kafka.common.errors.AuthorizationException is thrown by KafkaConsumer. By default the field is null and retries are disabled. In such case the container will be stopped. The interval must be less than max.poll.interval.ms consumer property.

We are quite confident, that setting authExceptionRetryInterval will solve our problem.

Upvotes: 1

monkeyStix
monkeyStix

Reputation: 630

In kafka config you can use reconnect.backoff.max.ms config parameter to set a maximum number of milliseconds to retry connecting. Additionally, set the parameter reconnect.backoff.ms to a base number of milliseconds to wait before retrying to connect.

If provided, the backoff per host will increase exponentially for each consecutive connection failure, up to this maximum.

Kafka documentation https://kafka.apache.org/31/documentation/#streamsconfigs

If you set the max milliseconds to reconnect to something fairly high, like a day, the connection will be reattempted for up to a day (With increasing intervals, 50,500,5000,50000 etc').

Upvotes: 1

Related Questions