Reputation: 73
I meet a strange thing about Kafka rebalance. If I increase partitions on a topic, which subscribed by some java consumers(in same one group), there is no consumer rebalance occur. After that, I try to cause a rebalance by starting a new consumer (or kill one), and the new increased partitions could not be assigned in this rebalance. I found that the new partitions could only be assigned after I stop all consumers and start them. I don't know if it's normal or if there is any explanation for it.
Below is my test on my computer:
1.Start Kafka, ZK. Create a normal topic(test-topic) with 1 partitions
./bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --topic test-topic --partitions 1 --replication-factor 1 --config retention.ms=604800000
2.Start 2 java consumers (C1, C2), subscribe test-topic
3.Increase 2 partitions of test-topic
$ ./bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --alter --topic test-topic --partitions 3
There is no rebalance occur in C1, C2
4.Start a new consumer C3 to subscribed test-topic. Rebalance occur, but only partition test-topic-0 involved in reassign, neither test-topic-1 or test-topic-2 involved.
5.I try to cause rebalance by stopping C2 and C3. However test-topic-1 and test-topic-2 are still not be assigned.
6.Stop all running consumers, and then start them. All test-topic-0,1,2 are assigned normally.
kafka & java api version: kafka_2.12-2.0.0 (I also tried kafka_2.11-1.0.0 and kafka_2.10-0.10.2.1, same result)
zookeeper: 3.4.13
consumer code:
public class KafkaConsumerThread extends Thread {
// consumer settings
public static org.apache.kafka.clients.consumer.KafkaConsumer<String, String> createNativeConsumer(String groupName, String kafkaBootstrap) {
Properties props = new Properties();
props.put("bootstrap.servers", kafkaBootstrap);
props.put("group.id", groupName);
props.put("auto.offset.reset", "earliest");
props.put("enable.auto.commit", true);
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
return new KafkaConsumer<String, String>(props);
}
@Override
public void run() {
log.info("Start consumer ..");
consumer.subscribe(Collections.singleton(topicName), consumerRebalanceListener);
while (!stop) {
try {
ConsumerRecords<String, String> records = consumer.poll(100);
receivedRecordNumber.addAndGet(records.count());
Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
while (iterator.hasNext()) {
ConsumerRecord<String, String> record = iterator.next();
log.info("Receive [key:{}][value:{}]", record.key(), record.value());
}
} catch (TimeoutException e) {
log.info("no data");
}
}
consumer.close();
}
}
Thanks for @Aftab Virtual's comment. I test again and wait for a longer time. About 5 minutes after the first consumer started, an rebalance did automatically raise and all partitions test-topic-0,1,2 reassigned. Therefore, Kafka do have an auto rebalance after alter partitions.
Furthermore, I followed @Aftab Virtual's advise to change leader.imbalance.check.interval.seconds
to 30. However the rebalance involved all partitions occur about 3 minutes after partition increased. I do add settings for broker:
auto.leader.rebalance.enable = true
leader.imbalance.check.interval.seconds = 30
I don't know what is the mechanism for this rebalance. And there is no more logs for this rebalance:
[2018-10-18 11:32:47,958] INFO [GroupCoordinator 0]: Preparing to rebalance group test-group with old generation 4 (__consumer_offsets-12) (kafka.coordinator.group.GroupCoordinator)
[2018-10-18 11:32:50,963] INFO [GroupCoordinator 0]: Stabilized group test-group generation 5 (__consumer_offsets-12) (kafka.coordinator.group.GroupCoordinator)
[2018-10-18 11:32:50,964] INFO [GroupCoordinator 0]: Assignment received from leader for group test-group for generation 5 (kafka.coordinator.group.GroupCoordinator)
Upvotes: 1
Views: 2191
Reputation: 73
After seeking advice from Kafka team and some Kafka users, I got the explanation for my test result. It's not a bug.
The increasing of the partitions will marked the metadata.updateNeeded=true. However this will not really trigger a update till the next metadata expire time (the default metadata.max.age.ms
is 5*60*1000 ms). Before the leader of the group update its metadata, the rebalance caused by changing consumer number will not involved the new partitions.
I decreased metadata.max.age.ms
to 30 seconds and Kafka became more sensitive to partitions increasing.
Upvotes: 4