user13964801
user13964801

Reputation:

How to consume only latest offset in Kafka topic

I am working on a scala application in which I am using kafka. My kafka consumer code is as follows.

def getValues(topic: String): String  = {
        
  val props = new Properties()
  props.put("group.id", "test")
  props.put("bootstrap.servers", "localhost:9092")
  props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") 
  props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
  props.put("auto.offset.reset", "earliest")
  val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](props)

  val topicPartition = new TopicPartition(topic, 0)
  consumer.assign(util.Collections.singletonList(topicPartition))
  val offset = consumer.position(topicPartition) - 1
  val record = consumer.poll(Duration.ofMillis(500)).asScala
  for (data <- record)
    if(data.offset() == offset) val value = data.value()
  return value
}

In this I just want to return latest value. When I run my application I get following log:

 Resetting offset for partition topic-0 to offset 0

Because of which val offset = consumer.position(topicPartition) - 1 becomes -1 and data.offset() gives list of all offsets. And as a result I don't get the latest value. Why it is automatically resetting offset to 0? How can I correct it? What is the mistake in my code? or anyother way I can get the value from the latest offset?

Upvotes: 1

Views: 3287

Answers (2)

Michael Heil
Michael Heil

Reputation: 18475

You are looking for the seek method which - according to the JavaDocs - "overrides the fetch offsets that the consumer will use on the next poll(timeout)".

Also make sure that you are setting

props.put("auto.offset.reset", "latest")

Making those two amendments to your code, the following worked for me to only fetch the value of the latest offset of the partion 0 in the selected topic:

import java.time.Duration
import java.util.Properties
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.TopicPartition
import collection.JavaConverters._

def getValues(topic: String): String  = {
    val props = new Properties()
    props.put("group.id", "test")
    props.put("bootstrap.servers", "localhost:9092")
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("auto.offset.reset", "latest")
    val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](props)

    val topicPartition = new TopicPartition(topic, 0)
    consumer.assign(java.util.Collections.singletonList(topicPartition))
    val offset = consumer.position(topicPartition) - 1
    consumer.seek(topicPartition, offset)
    val record = consumer.poll(Duration.ofMillis(500)).asScala
    for (data <- record) {
      val value: String = data.value() // you are only reading one message if no new messages flow into the Kafka topic
    }
    value
}

Upvotes: 1

clay
clay

Reputation: 11

In this line, props.put("auto.offset.reset", "earliest"), you set the parameter auto.offset.reset of your Kafka consumer to earliest, which will reset the offset to earliest. If you want the latest value, you should use latest instead. You can find the documentation here.

Upvotes: 0

Related Questions