Reputation: 31
In our infrastructure we are running Kafka with 3 nodes and have several spring boot services running in OpenShift. Some of the communication between the services happens via Kafka. For the consumers/listeners we are using the @KafkaListener spring annotation with a unique group ID so that each instance (pod) consumes all the partitions of a topic
@KafkaListener(topics = "myTopic", groupId = "group#{T(java.util.UUID).randomUUID().toString()}")
public void handleMessage(String message) {
doStuffWithMessage(message);
}
For the configuration we are using pretty much the default values. For the consumers all we got is
spring.kafka.consumer:
auto-offset-reset: latest
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
Sometimes we face the unfortunate situation, where all of our Kafka nodes are shortly down, which will result in the consumers unregistering, as logged by org.apache.kafka.common.utils.AppInfoParser
App info kafka.consumer for consumer-group5c327050-5b05-46fb-a7be-c8d8a20d293a-1 unregistered
Once the nodes are up again, we would expect that the consumers register again, however that is not the case. So far we have no idea why they fail to do so. For now we are forced to restart the affected pods, when this issue occurs. Did anybody have a similar issue before or has an idea what we might be doing wrong?
Edit: We are using the following versions
Upvotes: 2
Views: 16838
Reputation: 31
We did some more digging in our logs and found the underlying issue that causes the consumer(s) to be stopped.
Authentcation/Authorization Exception and no authExceptionRetryInterval set
So apparently the consumer is getting an Authentcation/Authorization Exception when trying to reconnect to the currently unavailable Kafka nodes and since we did not set authExceptionRetryInterval there won't be any retries and the consumer (listener container) is stopped. https://docs.spring.io/spring-kafka/api/org/springframework/kafka/listener/ConsumerProperties.html#setAuthExceptionRetryInterval(java.time.Duration)
Set the interval between retries after and AuthenticationException or org.apache.kafka.common.errors.AuthorizationException is thrown by KafkaConsumer. By default the field is null and retries are disabled. In such case the container will be stopped. The interval must be less than max.poll.interval.ms consumer property.
We are quite confident, that setting authExceptionRetryInterval will solve our problem.
Upvotes: 1
Reputation: 630
In kafka config you can use reconnect.backoff.max.ms config parameter to set a maximum number of milliseconds to retry connecting. Additionally, set the parameter reconnect.backoff.ms to a base number of milliseconds to wait before retrying to connect.
If provided, the backoff per host will increase exponentially for each consecutive connection failure, up to this maximum.
Kafka documentation https://kafka.apache.org/31/documentation/#streamsconfigs
If you set the max milliseconds to reconnect to something fairly high, like a day, the connection will be reattempted for up to a day (With increasing intervals, 50,500,5000,50000 etc').
Upvotes: 1