Rajeev
Rajeev

Reputation: 4992

Get last inserted message from kafka topic

I have a requirement where I need to find the recently inserted message from Kafka topic. How can I achieve this?

I tried to fetch offset first and trying to get messages from that offset? Is it efficient solution?

val config = KafkaConfig()
  val props = new Properties()
//  ConsumerConfig
  props.put("bootstrap.servers", config.bootstrapServers)
  props.put("group.id", "stream-latest-consumer")
  props.put(
    "key.deserializer",
    "org.apache.kafka.common.serialization.StringDeserializer"
  )
  props.put(
    "value.deserializer",
    "org.apache.kafka.common.serialization.StringDeserializer"
  )
  val kafkaConsumer = new KafkaConsumer[String, String](props)

  val p = new TopicPartition(config.topic, 0)
  val cl: util.Collection[TopicPartition] = List(p).asJava
  val offsetsMap: java.util.Map[TopicPartition, java.lang.Long] =
    kafkaConsumer.endOffsets(cl)

  val offsetCount = offsetsMap.get(p)

Upvotes: 1

Views: 365

Answers (1)

Giorgos Myrianthous
Giorgos Myrianthous

Reputation: 39790

You can also use

void seekToEnd(Collection<TopicPartition> partitions)

in order to get the latest offset for the given partitions.

Upvotes: 1

Related Questions