Meisam Salami
Meisam Salami

Reputation: 68

kafka consumer does not recieve messages in java

in my code bellow consumer subscribe to existing topic but doesn't receive messages from topic please help me and it wait for messages whereas in kafka console consumer messages received properly

package kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

public class SimpleConsumer {
    public static void main(String[] args) throws Exception {

        //Kafka consumer configuration settings
        String topicName = "test12";
        Properties props = new Properties();

        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put("key.deserializer",
                        "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer",
            "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer
            <String, String>(props);

        //Kafka Consumer subscribes list of topics here.
        consumer.subscribe(Arrays.asList(topicName));

        //print the topic name
        System.out.println("Subscribed to topic " + topicName);
        int i = 0;

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for (ConsumerRecord<String, String> record : records)

                // print the offset,key and value for the consumer records.
                System.out.printf("offset = %d, key = %s, value = %s\n",
                    record.offset(), record.key(), record.value());
        }
    }
}

Upvotes: 1

Views: 1891

Answers (1)

ashwin konale
ashwin konale

Reputation: 91

It may be the case that offset is at the latest position. You can try seek from beginning.

        HashSet<TopicPartition> partitions = new HashSet<TopicPartition>();
        for (TopicPartition partition : partitions) {
            long offset = consumer.position(partition);
            System.out.println(partition.partition() + ": " + offset);
        }
        consumer.seekToBeginning(partitions);

Upvotes: 1

Related Questions