Reputation: 4992
I have a requirement where I need to find the recently inserted message from Kafka topic. How can I achieve this?
I tried to fetch offset first and trying to get messages from that offset? Is it efficient solution?
val config = KafkaConfig()
val props = new Properties()
// ConsumerConfig
props.put("bootstrap.servers", config.bootstrapServers)
props.put("group.id", "stream-latest-consumer")
props.put(
"key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer"
)
props.put(
"value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer"
)
val kafkaConsumer = new KafkaConsumer[String, String](props)
val p = new TopicPartition(config.topic, 0)
val cl: util.Collection[TopicPartition] = List(p).asJava
val offsetsMap: java.util.Map[TopicPartition, java.lang.Long] =
kafkaConsumer.endOffsets(cl)
val offsetCount = offsetsMap.get(p)
Upvotes: 1
Views: 365
Reputation: 39790
You can also use
void seekToEnd(Collection<TopicPartition> partitions)
in order to get the latest offset for the given partitions.
Upvotes: 1