rpli
rpli

Reputation: 73

Kafka: Increased partitions could not be assigned in next rebalance

I meet a strange thing about Kafka rebalance. If I increase partitions on a topic, which subscribed by some java consumers(in same one group), there is no consumer rebalance occur. After that, I try to cause a rebalance by starting a new consumer (or kill one), and the new increased partitions could not be assigned in this rebalance. I found that the new partitions could only be assigned after I stop all consumers and start them. I don't know if it's normal or if there is any explanation for it.

Below is my test on my computer:

1.Start Kafka, ZK. Create a normal topic(test-topic) with 1 partitions

./bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --topic test-topic --partitions 1 --replication-factor 1 --config retention.ms=604800000

2.Start 2 java consumers (C1, C2), subscribe test-topic

3.Increase 2 partitions of test-topic

$ ./bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --alter --topic test-topic --partitions 3

There is no rebalance occur in C1, C2

4.Start a new consumer C3 to subscribed test-topic. Rebalance occur, but only partition test-topic-0 involved in reassign, neither test-topic-1 or test-topic-2 involved.

5.I try to cause rebalance by stopping C2 and C3. However test-topic-1 and test-topic-2 are still not be assigned.

6.Stop all running consumers, and then start them. All test-topic-0,1,2 are assigned normally.

kafka & java api version: kafka_2.12-2.0.0 (I also tried kafka_2.11-1.0.0 and kafka_2.10-0.10.2.1, same result)

zookeeper: 3.4.13

consumer code:

public class KafkaConsumerThread extends Thread {
    // consumer settings
    public static org.apache.kafka.clients.consumer.KafkaConsumer<String, String> createNativeConsumer(String groupName, String kafkaBootstrap) {
        Properties props = new Properties();
        props.put("bootstrap.servers", kafkaBootstrap);
        props.put("group.id", groupName);
        props.put("auto.offset.reset", "earliest");
        props.put("enable.auto.commit", true);
        props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        return new KafkaConsumer<String, String>(props);
    }

    @Override
    public void run() {
        log.info("Start consumer ..");
        consumer.subscribe(Collections.singleton(topicName), consumerRebalanceListener);
        while (!stop) {
            try {
                ConsumerRecords<String, String> records = consumer.poll(100);
                receivedRecordNumber.addAndGet(records.count());
                Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
                while (iterator.hasNext()) {
                    ConsumerRecord<String, String> record = iterator.next();
                    log.info("Receive [key:{}][value:{}]", record.key(), record.value());
                }
            } catch (TimeoutException e) {
                log.info("no data");
            }
        }
        consumer.close();
    }
}

Thanks for @Aftab Virtual's comment. I test again and wait for a longer time. About 5 minutes after the first consumer started, an rebalance did automatically raise and all partitions test-topic-0,1,2 reassigned. Therefore, Kafka do have an auto rebalance after alter partitions.

Furthermore, I followed @Aftab Virtual's advise to change leader.imbalance.check.interval.seconds to 30. However the rebalance involved all partitions occur about 3 minutes after partition increased. I do add settings for broker:

auto.leader.rebalance.enable = true
leader.imbalance.check.interval.seconds = 30

I don't know what is the mechanism for this rebalance. And there is no more logs for this rebalance:

[2018-10-18 11:32:47,958] INFO [GroupCoordinator 0]: Preparing to rebalance group test-group with old generation 4 (__consumer_offsets-12) (kafka.coordinator.group.GroupCoordinator)
[2018-10-18 11:32:50,963] INFO [GroupCoordinator 0]: Stabilized group test-group generation 5 (__consumer_offsets-12) (kafka.coordinator.group.GroupCoordinator)
[2018-10-18 11:32:50,964] INFO [GroupCoordinator 0]: Assignment received from leader for group test-group for generation 5 (kafka.coordinator.group.GroupCoordinator)

Upvotes: 1

Views: 2191

Answers (1)

rpli
rpli

Reputation: 73

After seeking advice from Kafka team and some Kafka users, I got the explanation for my test result. It's not a bug.

The increasing of the partitions will marked the metadata.updateNeeded=true. However this will not really trigger a update till the next metadata expire time (the default metadata.max.age.ms is 5*60*1000 ms). Before the leader of the group update its metadata, the rebalance caused by changing consumer number will not involved the new partitions.

I decreased metadata.max.age.ms to 30 seconds and Kafka became more sensitive to partitions increasing.

Upvotes: 4

Related Questions