Prasath
Prasath

Reputation: 1284

Consume live messages in Kafka

I have started my zookeeper and Kafka server. I started my Kafka producer which sends 10 messages with topic 'xxx'. Then stopped my Kafka producer. Now I started my Kafka consumer and subscribed with topic 'xxx'. My consumer consumes those 10 messages sent by my Kafka producer, which is not running now. I need my Kafka consumer should only consume messages from running Kafka server. Is there any way to achieve this ? Following things in my consumer properties.

props.put("bootstrap.servers", "localhost:9092");
    String consumeGroup = "cg1";
    props.put("group.id", consumeGroup);
    props.put("enable.auto.commit", "true");
    props.put("auto.offset.reset", "earliest");
    props.put("auto.commit.interval.ms", "100");
    props.put("heartbeat.interval.ms", "3000");
    props.put("session.timeout.ms", "30000");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

Upvotes: 3

Views: 4600

Answers (2)

Rams
Rams

Reputation: 46

Please create a new topic and keep the property ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest". make sure to not commit the offset. i.e we should not use commitSync()

By default, receivers start consuming records from the last committed offset of each assigned partition. If a committed offset is not available, the offset reset strategy ConsumerConfig#AUTO_OFFSET_RESET_CONFIG configured for the KafkaConsumer is used to set the start offset to the earliest or latest offset on the partition.

I think in your case you are committing the offset or there ia committ offset available for given topic.

Upvotes: 0

Rambler
Rambler

Reputation: 5482

Set the following property :

consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

It tells the consumer to read only the latest messages , that is , the messages which were published after the consumer started.

Upvotes: 2

Related Questions