Shades88
Shades88

Reputation: 8360

Kafka server contains messages but consumer cannot receive any

Using a producer I entered some messages inside Kafka server which is running on localhost. Zookeeper is also on localhost. I have used ConsumerGroupExample as given here.

However, the consumer does not seem to receive any message! the kafka-console-consumer.sh script can pull out all those messages, but the code cannot. What is wrong? The Consumer code is exactly as it is given on that page.

zookeeper="localhost:2181", group id = "test-a", topic = "test". 

This is the same topic on which I published messages. Here's code for the producer:

package test;

import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import java.util.Properties;
import kafka.javaapi.producer.Producer;

public class KakfaProducer {
    public static void main(String[] args) {
        Properties prop = new Properties();
        prop.put("zookeeper.connect", "localhost:2181");
        prop.put("metadata.broker.list", "localhost:9092");
        prop.put("serializer.class", "kafka.serializer.StringEncoder");
        ProducerConfig producerConfig = new ProducerConfig(prop);
        Producer<String, String> producer = new<String, String> Producer(producerConfig);
        Task t = new Task(producer);
        Thread newThread = new Thread(t);
        newThread.start();
    }
}

class Task implements Runnable {
    Producer<String, String> producer;

    public Task(Producer producer) {
        this.producer = producer;
    }

    public void run() {
        long num = 1;
        while (true) {
            String topic = "test";
            KeyedMessage<String, String> message = new<String, String> KeyedMessage(topic, "Hello Test message " + num);
            producer.send(message);
            synchronized (this) {
                num++;
            }
            if (num % 1000 == 0) {
                System.out.println("Total messages sent by Thread-" + Thread.currentThread().getId() + num);
            }
        }
    }
}

Upvotes: 1

Views: 2919

Answers (1)

Jin Lee
Jin Lee

Reputation: 3522

Try changing your group.id to something else and test.

When you used kafka-console-conumser.sh to test your consumer, it consumed data from the topic and changed an offset. That's why your Consumer code couldn't consume with the same group.id.

Also, when you change your group.id, add to your Consumer :

props.put("auto.offset.reset", "earliest");

This will make sure that your new consumer group consumes data from the beginning.

Your consumer didn't receive any data :

  1. You already used the same group.id to consume before, and it has a new offset. It will consume only when there is new data in your Broker. (Change the group.id and test again.)

  2. Try auto.offset.reset=earliest. The consumer will read messages from the beginning, and you can use this for testing your consumer. There are also ways to read from a certain offset. seek(), seekToBeginning(), and seekToEnd().

Upvotes: 1

Related Questions