Reputation: 8360
Using a producer I entered some messages inside Kafka server which is running on localhost. Zookeeper is also on localhost. I have used ConsumerGroupExample
as given here.
However, the consumer does not seem to receive any message! the kafka-console-consumer.sh
script can pull out all those messages, but the code cannot. What is wrong? The Consumer code is exactly as it is given on that page.
zookeeper="localhost:2181", group id = "test-a", topic = "test".
This is the same topic on which I published messages. Here's code for the producer:
package test;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
public class KakfaProducer {
public static void main(String[] args) {
Properties prop = new Properties();
prop.put("zookeeper.connect", "localhost:2181");
prop.put("metadata.broker.list", "localhost:9092");
prop.put("serializer.class", "kafka.serializer.StringEncoder");
ProducerConfig producerConfig = new ProducerConfig(prop);
Producer<String, String> producer = new<String, String> Producer(producerConfig);
Task t = new Task(producer);
Thread newThread = new Thread(t);
newThread.start();
}
}
class Task implements Runnable {
Producer<String, String> producer;
public Task(Producer producer) {
this.producer = producer;
}
public void run() {
long num = 1;
while (true) {
String topic = "test";
KeyedMessage<String, String> message = new<String, String> KeyedMessage(topic, "Hello Test message " + num);
producer.send(message);
synchronized (this) {
num++;
}
if (num % 1000 == 0) {
System.out.println("Total messages sent by Thread-" + Thread.currentThread().getId() + num);
}
}
}
}
Upvotes: 1
Views: 2919
Reputation: 3522
Try changing your group.id
to something else and test.
When you used kafka-console-conumser.sh
to test your consumer, it consumed data from the topic and changed an offset. That's why your Consumer code couldn't consume with the same group.id
.
Also, when you change your group.id
, add to your Consumer :
props.put("auto.offset.reset", "earliest");
This will make sure that your new consumer group consumes data from the beginning.
Your consumer didn't receive any data :
You already used the same group.id to consume before, and it has a new offset. It will consume only when there is new data in your Broker. (Change the group.id
and test again.)
Try auto.offset.reset=earliest
. The consumer will read messages from the beginning, and you can use this for testing your consumer. There are also ways to read from a certain offset. seek()
, seekToBeginning()
, and seekToEnd()
.
Upvotes: 1