Thiyagu
Thiyagu

Reputation: 17880

Apache Kafka Java consumer does not receive message for topic with replication factor more than one

I'm starting on Apache Kakfa with a simple Producer, Consumer app in Java. I'm using kafka-clients version 0.10.0.1 and running it on a Mac.

I created a topic named replicated_topic_partitioned with 3 partitions and with replication factor as 3.

I started the zookeeper at port 2181. I started three brokers with id 1, 2 and 3 on ports 9092, 9093 and 9094 respectively.

Here's the output of the describe command

kafka_2.12-2.3.0/bin/kafka-topics.sh --describe --topic replicated_topic_partitioned --bootstrap-server localhost:9092    
Topic:replicated_topic_partitioned    PartitionCount:3    ReplicationFactor:3    Configs:segment.bytes=1073741824
     Topic: replicated_topic_partitioned    Partition: 0    Leader: 3    Replicas: 3,1,2    Isr: 3,1,2
     Topic: replicated_topic_partitioned    Partition: 1    Leader: 1    Replicas: 1,2,3    Isr: 1,2,3
     Topic: replicated_topic_partitioned    Partition: 2    Leader: 2    Replicas: 2,3,1    Isr: 2,3,1

I wrote a simple producer and a consumer code. The producer ran successfully and published the messages. But when I start the consumer, the poll call just waits indefinitely. On debugging, I found that it keeps on looping at the awaitMetadataUpdate method on the ConsumerNetworkClient.

Here are the code for Producer and Consumer

Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

KafkaProducer<String, String> myProducer = new KafkaProducer<>(properties);
DateFormat dtFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss:SSS");
String topic = "replicated_topic_partitioned";

int numberOfRecords = 10;
try {
    for (int i = 0; i < numberOfRecords; i++) {
       String message = String.format("Message: %s  sent at %s", Integer.toString(i), dtFormat.format(new Date()));
       System.out.println("Sending " + message);
       myProducer.send(new ProducerRecord<String, String>(topic, message));

    }
} catch (Exception e) {
    e.printStackTrace();
} finally {
    myProducer.close();
}

Consumer.java

Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  
properties.put("group.id", UUID.randomUUID().toString());
properties.put("auto.offset.reset", "earliest");

KafkaConsumer<String, String> myConsumer = new KafkaConsumer<>(properties);

String topic = "replicated_topic_partitioned";
myConsumer.subscribe(Collections.singletonList(topic));

try {
    while (true){
        ConsumerRecords<String, String> records = myConsumer.poll(1000);
         printRecords(records);
    }
 } finally {
     myConsumer.close();
 }

Adding some key-fields from server.properties

broker.id=1 
host.name=localhost
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600

log.dirs=/tmp/kafka-logs-1
num.partitions=1
num.recovery.threads.per.data.dir=1

transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

zookeeper.connection.timeout.ms=6000

group.initial.rebalance.delay.ms=0

The server.properties for the other two brokers was a replica of the above with broker.id, the port and thelog.dirs changed.

This did not work for me: Kafka 0.9.0.1 Java Consumer stuck in awaitMetadataUpdate()


But, if I start the consumer from the command line passing a partition, it successfully reads the messages for that partition. But it does not receive any message when just a topic is specified.

Works:

kafka_2.12-2.3.0/bin/kafka-console-consumer.sh --topic replicated_topic_partitioned --bootstrap-server localhost:9092 
     --from-beginning --partition 1

Does not work:

kafka_2.12-2.3.0/bin/kafka-console-consumer.sh --topic replicated_topic_partitioned --bootstrap-server localhost:9092 
    --from-beginning 

NOTE: The above consumer works perfectly for a topic with replication factor equals 1.

Question:

  1. Why does the Java Producer not read any message for topic with replication factor more than one (even when assigning it to a partition) (like myConsumer.assign(Collections.singletonList(new TopicPartition(topic, 2))?

  2. Why does the console consumer read message only when passed a partition (again works for a topic with replication factor of one)

Upvotes: 0

Views: 1477

Answers (2)

Thiyagu
Thiyagu

Reputation: 17880

Disclaimer: This is not an answer.

The Java consumer is now working as expected. I did not do any change to the code or the configuration. The only thing I did was to restart my Mac. This caused the kafka-logs folder (and the zookeeper folder too I guess) to be deleted.

I re-created the topic (with the same command - 3 partitions, replication factor of 3). Then re-started the brokers with the same configuration - no advertised.host.name or advertised.port config.

So, recreation of the kafka-logs and topics remediated something that was causing an issue earlier.


My only suspect is a non-properly terminated consumer. I ran the consumer code without the close call on the consumer in the finally block initially. I also had the same group.id. Maybe, all 3 partitions were assigned to consumers that weren't properly terminated or closed. This is just a guess..

But even calling myConsumer.position(new TopicPartition(topic, 2)) did not return a response earlier when I assigned the consumer to a partition. It was looping in the same awaitMetadataUpdate method.

Upvotes: 0

radai
radai

Reputation: 24192

so, youre sending 10 records, but all 10 records have the SAME key:

for (int i = 0; i < numberOfRecords; i++) {
   String message = String.format("Message: %s  sent at %s", Integer.toString(i), dtFormat.format(new Date()));
   System.out.println("Sending " + message);
   myProducer.send(new ProducerRecord<String, String>(topic, message)); <--- KEY=topic
}

unless told otherwise (by setting a partition directly on the ProducerRecord) the partition into which a record is delivered is determine by something like:

partition = murmur2(serialize(key)) % numPartitions

so same key means same partition.

have you tried searching for your 10 records on partitions 0 and 2 maybe?

if you want a better "spread" of records amongst partitions, either use a null key (you'd get round robin) or a variable key.

Upvotes: 1

Related Questions