Reputation: 117
I am facing issues in getting a very basic kafka consumer to work. I am using the kafka-clients-1.1.0.jar
Here is all that I have done.
zookeeper-server-start.bat ../../config/zookeeper.properties
kafka-server-start.bat ../../config/server.properties
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic hellotopic
Created topic "hellotopic".
Verify by listing the topics
D:\RC\Softwares\kafka_2.12-1.1.0\kafka_2.12-1.1.0\bin\windows>kafka-topics.bat --list --zookeeper localhost:2181
hellotopic
kafka-console-producer.bat --broker-list localhost:9092 --topic hellotopic --property "parse.key=true" --property "key.separator=:"
Message key and value entered as below
key1:value1
You can see that on the console consumer we are able to see the message in topic 'hellotopic'
kafka-console-consumer.bat --zookeeper localhost:2181 --topic hellotopic --from-beginning
Output for above command is as shown below. We can see the message value 'value1' that was posted
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
value1
Now that we have a topic with a message in it, I run my simple Java kafka consumer code to fetch all messages in the topic 'hellotopic'. Below is the code
import java.util.Arrays\;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class SampleConsumer {
public static void main(String[] args) {
System.out.println("Start consumer code");
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-consumer-group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("hellotopic"));
//while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
//}
System.out.println("End consumer code");
}
}
When we run the above class, here is the output seen
Start consumer code
End consumer code
Tried a lot to find the issue, but no luck yet. Much appreciate help on this simple example.
Upvotes: 1
Views: 5019
Reputation: 1208
I see two issues with the code:
props.put("auto.offset.reset", "earliest");
The --from-beginning
in your command line consumer actually translated to this config. This config tells the consumer to start from the earliest offset if there no committed offset found for the corresponding topic and partition within the group.The actual poll
should be in a loop. One poll
may not give the consumer enough time to do the subscription and also fetch data. One common way to do the poll is this:
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
} finally {
consumer.close();
}
Upvotes: 1