Reputation: 153
I have a Spring Boot application which consists of 10 different consumers , trying to consume messages from 10 different topics. The application uses same consumer group as all the 10 different topics are listed under the same consumer group. Now after a while of running the application (After 5 hours) , I am seeing the consumers going are trying to rebalance , and at times the rebalances never finishes hence unable to receive the messages after a while.
Here is my consumer configuration.
public Map<String, Object> consumerConfigs(String clientID) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
props.put(SECURITY_PROTOCOL, securityProtocol);
props.put(SASL_MECHANISM, saslMechanism);
props.put(SASL_JAAS_CONFIG,
String.format("%s required username=\"%s\" password=\"%s\" ;", loginModule, username, kafkaSecretPass));
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientID);
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
"org.apache.kafka.clients.consumer.RoundRobinAssignor");
//props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 120000);
//props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 60000);
//props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 600000);
//props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 2097164);
//props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 30000);
return props;
}
Here is the error I see after running the consumer for a while.
[Consumer clientId=consumer-one, groupId=group-1] (Re-)joining group
This attempt to rejoin the group goes on a for a long time , eventually it is never able to join the group.
Here is how one of the consumer is consuming the topic and sending it for processing.
@EventListener(ApplicationStartedEvent.class)
public Disposable consume() {
return reactiveKafkaConsumerTemplate
.receive()
.concatMap(consumerRecord -> {
log.info("Received event:customer offset {} and value {}",
consumerRecord.offset(),new String(consumerRecord.value(), StandardCharsets.UTF_8));
return updateCustomer(consumerRecord).flatMap(success -> {
consumerRecord.receiverOffset().acknowledge();
return Mono.just("Received");
});
})
.retryWhen(Retry.backoff(3, Duration.ofSeconds(2)).transientErrors(true))
.onErrorResume(e -> {
log.info("Customer - Receiver On Error Resume");
return Mono.empty();
})
.repeat()
.subscribe(suc -> log.info("Subscribe successfully in Customer"), err -> log.error("Error occurred during subscribe Customer, " + err));
}
Upvotes: 3
Views: 5811
Reputation: 647
Frequent rebalances are usually caused because it is taking too long for the consumer to process batches. This happens because the consumer is processing the batch for a long time (and heartbeats are not being sent) and therefore the brokers think that consumer was lost and they start re-balancing.
I would either suggest to create smaller batches by reducing the value of max.partition.fetch.bytes
or extend/increase heartbeat intervals by increasing the value of heartbeat.interval.ms
.
For more info : More info
Upvotes: 4