Reputation: 17880
I'm starting on Apache Kakfa with a simple Producer, Consumer app in Java. I'm using kafka-clients
version 0.10.0.1
and running it on a Mac.
I created a topic named replicated_topic_partitioned
with 3 partitions and with replication factor as 3.
I started the zookeeper at port 2181. I started three brokers with id 1, 2 and 3 on ports 9092, 9093 and 9094 respectively.
Here's the output of the describe command
kafka_2.12-2.3.0/bin/kafka-topics.sh --describe --topic replicated_topic_partitioned --bootstrap-server localhost:9092
Topic:replicated_topic_partitioned PartitionCount:3 ReplicationFactor:3 Configs:segment.bytes=1073741824
Topic: replicated_topic_partitioned Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
Topic: replicated_topic_partitioned Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: replicated_topic_partitioned Partition: 2 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
I wrote a simple producer and a consumer code. The producer ran successfully and published the messages. But when I start the consumer, the poll call just waits indefinitely. On debugging, I found that it keeps on looping at the awaitMetadataUpdate method on the ConsumerNetworkClient.
Here are the code for Producer and Consumer
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> myProducer = new KafkaProducer<>(properties);
DateFormat dtFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss:SSS");
String topic = "replicated_topic_partitioned";
int numberOfRecords = 10;
try {
for (int i = 0; i < numberOfRecords; i++) {
String message = String.format("Message: %s sent at %s", Integer.toString(i), dtFormat.format(new Date()));
System.out.println("Sending " + message);
myProducer.send(new ProducerRecord<String, String>(topic, message));
}
} catch (Exception e) {
e.printStackTrace();
} finally {
myProducer.close();
}
Consumer.java
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("group.id", UUID.randomUUID().toString());
properties.put("auto.offset.reset", "earliest");
KafkaConsumer<String, String> myConsumer = new KafkaConsumer<>(properties);
String topic = "replicated_topic_partitioned";
myConsumer.subscribe(Collections.singletonList(topic));
try {
while (true){
ConsumerRecords<String, String> records = myConsumer.poll(1000);
printRecords(records);
}
} finally {
myConsumer.close();
}
Adding some key-fields from server.properties
broker.id=1
host.name=localhost
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs-1
num.partitions=1
num.recovery.threads.per.data.dir=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
The server.properties for the other two brokers was a replica of the above with broker.id, the port and thelog.dirs changed.
This did not work for me: Kafka 0.9.0.1 Java Consumer stuck in awaitMetadataUpdate()
But, if I start the consumer from the command line passing a partition, it successfully reads the messages for that partition. But it does not receive any message when just a topic is specified.
Works:
kafka_2.12-2.3.0/bin/kafka-console-consumer.sh --topic replicated_topic_partitioned --bootstrap-server localhost:9092
--from-beginning --partition 1
Does not work:
kafka_2.12-2.3.0/bin/kafka-console-consumer.sh --topic replicated_topic_partitioned --bootstrap-server localhost:9092
--from-beginning
NOTE: The above consumer works perfectly for a topic with replication factor equals 1.
Question:
Why does the Java Producer not read any message for topic with replication factor more than one (even when assigning it to a partition) (like myConsumer.assign(Collections.singletonList(new TopicPartition(topic, 2)
)?
Why does the console consumer read message only when passed a partition (again works for a topic with replication factor of one)
Upvotes: 0
Views: 1477
Reputation: 17880
Disclaimer: This is not an answer.
The Java consumer is now working as expected. I did not do any change to the code or the configuration. The only thing I did was to restart my Mac. This caused the kafka-logs
folder (and the zookeeper
folder too I guess) to be deleted.
I re-created the topic (with the same command - 3 partitions, replication factor of 3). Then re-started the brokers with the same configuration - no advertised.host.name
or advertised.port
config.
So, recreation of the kafka-logs and topics remediated something that was causing an issue earlier.
My only suspect is a non-properly terminated consumer. I ran the consumer code without the close
call on the consumer in the finally block initially. I also had the same group.id
. Maybe, all 3 partitions were assigned to consumers that weren't properly terminated or closed. This is just a guess..
But even calling myConsumer.position(new TopicPartition(topic, 2))
did not return a response earlier when I assigned the consumer to a partition. It was looping in the same awaitMetadataUpdate
method.
Upvotes: 0
Reputation: 24192
so, youre sending 10 records, but all 10 records have the SAME key:
for (int i = 0; i < numberOfRecords; i++) {
String message = String.format("Message: %s sent at %s", Integer.toString(i), dtFormat.format(new Date()));
System.out.println("Sending " + message);
myProducer.send(new ProducerRecord<String, String>(topic, message)); <--- KEY=topic
}
unless told otherwise (by setting a partition directly on the ProducerRecord
) the partition into which a record is delivered is determine by something like:
partition = murmur2(serialize(key)) % numPartitions
so same key means same partition.
have you tried searching for your 10 records on partitions 0 and 2 maybe?
if you want a better "spread" of records amongst partitions, either use a null key (you'd get round robin) or a variable key.
Upvotes: 1