Apache Kafka Java consumer does not receive message for topic with replication factor more than one

I'm starting on Apache Kakfa with a simple Producer, Consumer app in Java. I'm using kafka-clients version and running it on a Mac.

I created a topic named replicated_topic_partitioned with 3 partitions and with replication factor as 3.

I started the zookeeper at port 2181. I started three brokers with id 1, 2 and 3 on ports 9092, 9093 and 9094 respectively.

Here's the output of the describe command

kafka_2.12-2.3.0/bin/ --describe --topic replicated_topic_partitioned --bootstrap-server localhost:9092    
Topic:replicated_topic_partitioned    PartitionCount:3    ReplicationFactor:3    Configs:segment.bytes=1073741824
     Topic: replicated_topic_partitioned    Partition: 0    Leader: 3    Replicas: 3,1,2    Isr: 3,1,2
     Topic: replicated_topic_partitioned    Partition: 1    Leader: 1    Replicas: 1,2,3    Isr: 1,2,3
     Topic: replicated_topic_partitioned    Partition: 2    Leader: 2    Replicas: 2,3,1    Isr: 2,3,1

I wrote a simple producer and a consumer code. The producer ran successfully and published the messages. But when I start the consumer, the poll call just waits indefinitely. On debugging, I found that it keeps on looping at the awaitMetadataUpdate method on the ConsumerNetworkClient.

Here are the code for Producer and Consumer

Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

KafkaProducer<String, String> myProducer = new KafkaProducer<>(properties);
DateFormat dtFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss:SSS");
String topic = "replicated_topic_partitioned";

int numberOfRecords = 10;
try {
    for (int i = 0; i < numberOfRecords; i++) {
       String message = String.format("Message: %s  sent at %s", Integer.toString(i), dtFormat.format(new Date()));
       System.out.println("Sending " + message);
       myProducer.send(new ProducerRecord<String, String>(topic, message));

} catch (Exception e) {
} finally {

Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  
properties.put("", UUID.randomUUID().toString());
properties.put("auto.offset.reset", "earliest");

KafkaConsumer<String, String> myConsumer = new KafkaConsumer<>(properties);

String topic = "replicated_topic_partitioned";

try {
    while (true){
        ConsumerRecords<String, String> records = myConsumer.poll(1000);
 } finally {

Adding some key-fields from



The for the other two brokers was a replica of the above with, the port and thelog.dirs changed.

This did not work for me: Kafka Java Consumer stuck in awaitMetadataUpdate()

But, if I start the consumer from the command line passing a partition, it successfully reads the messages for that partition. But it does not receive any message when just a topic is specified.


kafka_2.12-2.3.0/bin/ --topic replicated_topic_partitioned --bootstrap-server localhost:9092 
     --from-beginning --partition 1

Does not work:

kafka_2.12-2.3.0/bin/ --topic replicated_topic_partitioned --bootstrap-server localhost:9092 

NOTE: The above consumer works perfectly for a topic with replication factor equals 1.


  1. Why does the Java Producer not read any message for topic with replication factor more than one (even when assigning it to a partition) (like myConsumer.assign(Collections.singletonList(new TopicPartition(topic, 2))?

  2. Why does the console consumer read message only when passed a partition (again works for a topic with replication factor of one)

Disclaimer: This is not an answer.

The Java consumer is now working as expected. I did not do any change to the code or the configuration. The only thing I did was to restart my Mac. This caused the kafka-logs folder (and the zookeeper folder too I guess) to be deleted.

I re-created the topic (with the same command - 3 partitions, replication factor of 3). Then re-started the brokers with the same configuration - no or advertised.port config.

So, recreation of the kafka-logs and topics remediated something that was causing an issue earlier.

My only suspect is a non-properly terminated consumer. I ran the consumer code without the close call on the consumer in the finally block initially. I also had the same Maybe, all 3 partitions were assigned to consumers that weren't properly terminated or closed. This is just a guess..

But even calling myConsumer.position(new TopicPartition(topic, 2)) did not return a response earlier when I assigned the consumer to a partition. It was looping in the same awaitMetadataUpdate method.

Reputation: 24202

so, youre sending 10 records, but all 10 records have the SAME key:

for (int i = 0; i < numberOfRecords; i++) {
   String message = String.format("Message: %s  sent at %s", Integer.toString(i), dtFormat.format(new Date()));
   System.out.println("Sending " + message);
   myProducer.send(new ProducerRecord<String, String>(topic, message)); <--- KEY=topic

unless told otherwise (by setting a partition directly on the ProducerRecord) the partition into which a record is delivered is determine by something like:

partition = murmur2(serialize(key)) % numPartitions

so same key means same partition.

have you tried searching for your 10 records on partitions 0 and 2 maybe?

if you want a better "spread" of records amongst partitions, either use a null key (you'd get round robin) or a variable key.

