Praveen Kumar
Praveen Kumar

Reputation: 31

Kafka Consumer is not reading the topic data through Java

I am sending input JSON data to Kafka topic. I can see the same JSON data in kafka consumer by using below command.

.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic mytopic --from-beginning

Case1: But when I am trying to read the records from kafka consumer (java) not getting any records in java console. I have tried the examples given in this link https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html

Case2: If I send any message from the producer (command window) the same getting in the consumer(command window). and able to see the same records in the java console. This scenario is working.

If I submit the data to the topic through java program. Then the same JSON data appearing in consumer(command window). But not coming in java console.Case1 is not working.
Case2 is working. Please let me know any configuration need to be done?

Upvotes: 3

Views: 3701

Answers (3)

Wisani Salani
Wisani Salani

Reputation: 144

You need to make sure you are not sending empty lines at the end and only consuming the latest - use auto.offset.reset: 'earliest' like above, or properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

And then, consumer.seekToBeginning(consumer.assignment()); to make sure.

Upvotes: 1

Ashish Bhosle
Ashish Bhosle

Reputation: 657

Assuming producer & consumer code is correct.

  • Stop all consumers.
  • Reset your consumer group

$KAFKA_HOME/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group group_name --topic topic_name --reset-offsets --to-earliest --execute

  • Now start your consumer

This should fix your issue.

Here are some Kafka consumer properties:

    bootstrap.servers: 'localhost:9092'
    group.id: 'group_id'
    auto.offset.reset: 'earliest'
    key.deserializer: 'org.apache.kafka.common.serialization.*' //Replace * with class
    value.deserializer: 'org.apache.kafka.common.serialization.*'

Thanks

Upvotes: 0

Nishu Tayal
Nishu Tayal

Reputation: 20830

You need to set the ConsumerConfig.AUTO_OFFSET_RESET_CONFIG in order to read from the beginning.

kafkaConsumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); 

And also make sure that you are not running different-different consumer processes using the same consumer group ID, as the data from one partition might be read from one process and another will not see anything.

Upvotes: 0

Related Questions