André R.
André R.

Reputation: 1647

Loading offsets and metadata blocks KafkaConsumer after broker restart for a long time

we have the problem that sometimes calls to the 'poll' method of the new KafkaConsumer hangs for as long as 20 to 30 Minutes after one out of three kafka brokers got restartet !

We are using a 3 broker kafka setup (0.9.0.1). Our Consumer-Processes use the new Java KafkaConsumer-API and we are assigning to specific TopicPartitions.

for different reasons i can't show the real code here, but basically our code works like this :

Properties consumerProps=loadConsumerProperties();
// bootstrap.servers=<IP1>:9092,<IP2>:9092,<IP3>:9092
// group.id="consumer_group_gwbc2
// enable.auto.commit=false
// auto.offset.reset=latest
// session.timeout.ms=30000
// key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
// value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer

KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(consumerProps);
consumer.assign(Arrays.asList(new TopicPartition("someTopic",0)));

while (true) {

  // THIS CALL sometimes blocks for a very long Time after a broker restart
  ConsumerRecords<String, byte[]> records = kafkaConsumer.poll(200);

  Iterator<ConsumerRecord<String, byte[]>> recordIter = records.iterator();
  while (recordIter.hasNext()) {                
     ConsumerRecord<String, byte[]> record = recordIter.next();

     // Very fast, actually just sending a UDP Paket via Netty.
     processRecord(record); 

     if (lastCommitHappendFiveOrMoreSecondsAgo()) {   
       kafkaConsumer.commitAsync();
     }
  }
}

kafka-topics.sh describes the __consumer_offsets topic as follows

Topic:__consumer_offsets    PartitionCount:50   
ReplicationFactor:3 Configs:segment.bytes=104857600,
cleanup.policy=compact,compression.type=uncompressed

the server.log of the restarted broker shows that loading the offsets from a specific partition of the __consumer_offsets topic takes a long time (in this case about 22 Minutes). This correlates to the time the 'poll' call of the consumer is blocked.

[2016-07-25 16:02:40,846] INFO [Group Metadata Manager on Broker 1]: Loading offsets and group metadata from [__consumer_offsets,15] (kafka.coordinator.GroupMetadataManager)
[2016-07-25 16:25:36,697] INFO [Group Metadata Manager on Broker 1]: Finished loading offsets from [__consumer_offsets,15] in 1375851 milliseconds.

i'am wondering what makes the loading process so slow and what can be done about it !?

Upvotes: 3

Views: 2197

Answers (1)

Andr&#233; R.
Andr&#233; R.

Reputation: 1647

Found the reason.

the server.xml configuration files for our brokers contain the property

log.cleaner.enable=false

(by default this property is true as of version 0.9.0.1) this means that kafkas internal compacted __consumer_offsets topic is not actually compacted since the log-cleaner is disabled. in effect some partitions of this topic grew to a size of serveral gigabytes which explains the amount of time needed to read through all of the consumer-offsets data when a new group-coordinator needs to refill it's cache.

Upvotes: 1

Related Questions