Reputation: 41
I am learning kafka following Apache kafka document. I started it with default configuration.
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties &
I ran the kafka-console-producer.sh and kafka-console-consumer.sh to produce and consume message, and it was successful. I wrote a java code using producer API to produce message, which is OK. This is verified by kafka-console-consumer.sh. The code is same as Apache Kafka guide:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for(int i = 0; i < 100; i++)
producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
producer.close();
Although producer code works, consumer code doesn't work. There's no Exception, but it just blocks at consumer.poll(100). The code is from Apache Kafka documentation:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
By the way, the example of kafka-console-consumer.sh in Apache Kafka document is successful in consuming message, which is produced to topic "test" by producer :
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
But if I don't connect to zookeeper, but connect to kafka broker directly, then also it doesn't work with no wrong and no exception, it just blocks.
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
The kafka version and API version is 0.11.0.0
Why they can't consume messages?
Upvotes: 4
Views: 1841
Reputation: 10065
Using --zookeeper
parameter means using the old consumer and it works well because you are specifying a Zookeeper server (localhost:2181
).
When you want to specify a Kafka broker (so using the new consumer) you have to use the --bootstrap-server
option: you are still using the --zookeeper
but passing a valid Kafka broker address (localhost:9092
).
So for the console consumer application your configuration needs to be --bootstrap-server localhost:9092
instead of --zookeeper localhost:9092
.
Regarding your code, are you sure that the poll method is blocked? It should exit after 100 ms (the timeout you have specified) if there are no records but not blocking.
Then from your code I see that producer is sending to "my-topic", the consumer subscribes to "foo" and "bar"; finally the console consumer reads from "test". All are different topics!
Upvotes: 1
Reputation: 49
props.put("auto.offset.reset", "smallest"); Add this Properties ,May it will work
Upvotes: 0