vigneshr35
vigneshr35

Reputation: 117

Why is my simple Kafka Consumer example not working

I am facing issues in getting a very basic kafka consumer to work. I am using the kafka-clients-1.1.0.jar

Here is all that I have done.

  1. Started zookeeper on command line (All commands are run from )

zookeeper-server-start.bat ../../config/zookeeper.properties

  1. Started Kafka server

kafka-server-start.bat ../../config/server.properties

  1. Created a new topic 'hellotopic' and verified it by listing the topics

kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic hellotopic Created topic "hellotopic".

Verify by listing the topics

D:\RC\Softwares\kafka_2.12-1.1.0\kafka_2.12-1.1.0\bin\windows>kafka-topics.bat --list --zookeeper localhost:2181 hellotopic

  1. Post message to the topic and verified the same on console consumer

kafka-console-producer.bat --broker-list localhost:9092 --topic hellotopic --property "parse.key=true" --property "key.separator=:"

Message key and value entered as below

key1:value1

You can see that on the console consumer we are able to see the message in topic 'hellotopic'

kafka-console-consumer.bat --zookeeper localhost:2181 --topic hellotopic --from-beginning

Output for above command is as shown below. We can see the message value 'value1' that was posted

Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper]. value1

Now that we have a topic with a message in it, I run my simple Java kafka consumer code to fetch all messages in the topic 'hellotopic'. Below is the code

import java.util.Arrays\;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class SampleConsumer {
    public static void main(String[] args) {
        System.out.println("Start consumer code");
        Properties props = new Properties();
         props.put("bootstrap.servers", "localhost:9092");
         props.put("group.id", "test-consumer-group");
         props.put("enable.auto.commit", "true");
         props.put("auto.commit.interval.ms", "1000");
         props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
         props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
         KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
         consumer.subscribe(Arrays.asList("hellotopic"));
         //while (true) {
             ConsumerRecords<String, String> records = consumer.poll(100);
             for (ConsumerRecord<String, String> record : records)
                 System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
         //}
         System.out.println("End consumer code");
    }
}

When we run the above class, here is the output seen

Start consumer code
End consumer code

Tried a lot to find the issue, but no luck yet. Much appreciate help on this simple example.

Upvotes: 1

Views: 5019

Answers (1)

vahid
vahid

Reputation: 1208

I see two issues with the code:

  1. You are missing a particular config that makes the consumer start from the earliest offset: props.put("auto.offset.reset", "earliest"); The --from-beginning in your command line consumer actually translated to this config. This config tells the consumer to start from the earliest offset if there no committed offset found for the corresponding topic and partition within the group.
  2. The actual poll should be in a loop. One poll may not give the consumer enough time to do the subscription and also fetch data. One common way to do the poll is this:

    try {
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }
    } finally {
        consumer.close();
    }
    

Upvotes: 1

Related Questions