Srikant Barik
Srikant Barik

Reputation: 153

Kafka Rejoining Group Quite Frequently

I have a Spring Boot application which consists of 10 different consumers , trying to consume messages from 10 different topics. The application uses same consumer group as all the 10 different topics are listed under the same consumer group. Now after a while of running the application (After 5 hours) , I am seeing the consumers going are trying to rebalance , and at times the rebalances never finishes hence unable to receive the messages after a while.

Here is my consumer configuration.

public Map<String, Object> consumerConfigs(String clientID) {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

    props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
    props.put(SECURITY_PROTOCOL, securityProtocol);
    props.put(SASL_MECHANISM, saslMechanism);
    props.put(SASL_JAAS_CONFIG,
            String.format("%s required username=\"%s\" password=\"%s\" ;", loginModule, username, kafkaSecretPass));

    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5);
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
    props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientID);
    props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
            "org.apache.kafka.clients.consumer.RoundRobinAssignor");
   //props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 120000);
  //props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 60000);
  //props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 600000);
 //props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 2097164);
 //props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 30000);
     return props;
}

Here is the error I see after running the consumer for a while.

[Consumer clientId=consumer-one, groupId=group-1] (Re-)joining group

This attempt to rejoin the group goes on a for a long time , eventually it is never able to join the group.

Here is how one of the consumer is consuming the topic and sending it for processing.

  @EventListener(ApplicationStartedEvent.class)
public Disposable consume() {
    return reactiveKafkaConsumerTemplate
            .receive()
            .concatMap(consumerRecord -> {
                log.info("Received event:customer offset {} and value {}",
                        consumerRecord.offset(),new String(consumerRecord.value(), StandardCharsets.UTF_8));
                return updateCustomer(consumerRecord).flatMap(success -> {
                    consumerRecord.receiverOffset().acknowledge();
                    return Mono.just("Received");
                });
            })
            .retryWhen(Retry.backoff(3, Duration.ofSeconds(2)).transientErrors(true))
            .onErrorResume(e -> {
                log.info("Customer - Receiver On Error Resume");
                return Mono.empty();
            })
            .repeat()
            .subscribe(suc -> log.info("Subscribe successfully in Customer"), err -> log.error("Error occurred during subscribe Customer, " + err));
}

Upvotes: 3

Views: 5811

Answers (1)

Umeshwaran
Umeshwaran

Reputation: 647

Frequent rebalances are usually caused because it is taking too long for the consumer to process batches. This happens because the consumer is processing the batch for a long time (and heartbeats are not being sent) and therefore the brokers think that consumer was lost and they start re-balancing.

I would either suggest to create smaller batches by reducing the value of max.partition.fetch.bytes or extend/increase heartbeat intervals by increasing the value of heartbeat.interval.ms.

For more info : More info

Upvotes: 4

Related Questions