Simon Su
Simon Su

Reputation: 2343

CommitFailedException Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member

I was using kafka 0.10.2 and now faced a CommitFailedException. like:

Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

I have set max.poll.interval.ms to Integer.MAX_VALUE. so can anyone tell me why this still happens even I have set the value ?

Another question is: I do as description to set session.timeout.ms to 60000 and it still happens. I try to reproduce by a simple code

 public static void main(String[] args) throws InterruptedException {     
        Logger logger = Logger.getLogger(KafkaConsumer10.class);
        logger.info("XX");
        Properties props = new Properties();
        props.put("bootstrap.servers", "kafka-broker:9098");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("max.poll.interval.ms", "300000");
        props.put("session.timeout.ms", "10000");
        props.put("max.poll.records", "2");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("t1"));
        while (true) {
            Thread.sleep(11000);
            ConsumerRecords<String, String> records = consumer.poll(100);
            //Thread.sleep(11000);
            Thread.sleep(11000);
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }

when I set session.timeout.ms to 10000, I try to sleep more than 10000 ms in my poll loop, but it seems work and no Exception out. so I'm confused about this. if heartbeat is triggered by consumer.poll and consumer.commit, seems heartbeat is out of session timeout in my code. why not throw CommitFailedException ?

Upvotes: 51

Views: 92356

Answers (3)

Abhimanyu
Abhimanyu

Reputation: 2740

For this you need to handle the rebalancing condition in your code and should process the ongoing message and commit it before rebalancing

Like :

private class HandleRebalance implements ConsumerRebalanceListener {
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        // Implement what you want to do once rebalancing is done.
    }

    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        // commit current method
    }
}

and Use this syntax for subscribing the topic :

kafkaConsumer.subscribe(topicNameList , new HandleRebalance())

The advantage of doing this :

  1. Messages will not repeat when the rebalancing is taking place.

  2. No commit fail exception

Upvotes: 15

Xia Gong
Xia Gong

Reputation: 1

If you are also writing tests like me, you can use that group id to consume one message with a shell command. It can eliminate this error temporarily. For long-term solutions, you can refer to the above answers, e.g.:

./kafka-console-consumer.sh --bootstrap-server xxx:9092 --topic yyy --group zzz --max-messages 1 

Upvotes: 0

Rahul Teke
Rahul Teke

Reputation: 241

session.timeout.ms set on the consumer should be less than the group.max.session.timeout.ms set on Kafka broker.

This resolved the issue for me.

Credit to github link Commit Failures

Upvotes: 24

Related Questions