Reputation:
I am working on a scala application in which I am using kafka. My kafka consumer code is as follows.
def getValues(topic: String): String = {
val props = new Properties()
props.put("group.id", "test")
props.put("bootstrap.servers", "localhost:9092")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("auto.offset.reset", "earliest")
val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](props)
val topicPartition = new TopicPartition(topic, 0)
consumer.assign(util.Collections.singletonList(topicPartition))
val offset = consumer.position(topicPartition) - 1
val record = consumer.poll(Duration.ofMillis(500)).asScala
for (data <- record)
if(data.offset() == offset) val value = data.value()
return value
}
In this I just want to return latest value. When I run my application I get following log:
Resetting offset for partition topic-0 to offset 0
Because of which val offset = consumer.position(topicPartition) - 1
becomes -1 and data.offset() gives list of all offsets. And as a result I don't get the latest value. Why it is automatically resetting offset to 0? How can I correct it? What is the mistake in my code? or anyother way I can get the value from the latest offset?
Upvotes: 1
Views: 3287
Reputation: 18475
You are looking for the seek
method which - according to the JavaDocs - "overrides the fetch offsets that the consumer will use on the next poll(timeout)".
Also make sure that you are setting
props.put("auto.offset.reset", "latest")
Making those two amendments to your code, the following worked for me to only fetch the value
of the latest offset of the partion 0
in the selected topic:
import java.time.Duration
import java.util.Properties
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.TopicPartition
import collection.JavaConverters._
def getValues(topic: String): String = {
val props = new Properties()
props.put("group.id", "test")
props.put("bootstrap.servers", "localhost:9092")
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
props.put("auto.offset.reset", "latest")
val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](props)
val topicPartition = new TopicPartition(topic, 0)
consumer.assign(java.util.Collections.singletonList(topicPartition))
val offset = consumer.position(topicPartition) - 1
consumer.seek(topicPartition, offset)
val record = consumer.poll(Duration.ofMillis(500)).asScala
for (data <- record) {
val value: String = data.value() // you are only reading one message if no new messages flow into the Kafka topic
}
value
}
Upvotes: 1
Reputation: 11
In this line, props.put("auto.offset.reset", "earliest")
, you set the parameter auto.offset.reset
of your Kafka consumer to earliest
, which will reset the offset to earliest. If you want the latest value, you should use latest
instead.
You can find the documentation here.
Upvotes: 0