user14010998
user14010998

Reputation:

How to get Kafka messages based on timestamp

I am working on a application in which I am using kafka and tech is scala. My kafka consumer code is as follows:

val props = new Properties()
        props.put("group.id", "test")
        props.put("bootstrap.servers", "localhost:9092")
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
        props.put("auto.offset.reset", "earliest")
        props.put("group.id", "consumer-group")
    val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](props)
    consumer.subscribe(util.Collections.singletonList(topic))
    val record = consumer.poll(Duration.ofMillis(500)).asScala.toList

It gives me all the records but the thing is I already have data in kafka consumer which may lead to duplicate data means data with same key can already be there in topic. Is there is any way by which I can retrieve data from a particular time. Means before polling if I can calculate current time and retrieve only those records which came after that time. Any way I can achieve this?

Upvotes: 2

Views: 17554

Answers (2)

Michael Heil
Michael Heil

Reputation: 18475

You can use the offsetsForTimes method in the KafkaConsumer API.

Code

import java.time.Duration
import java.util.Properties
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.common.TopicPartition
import collection.JavaConverters._

object OffsetsForTime extends App {

  implicit def toJavaOffsetQuery(offsetQuery: Map[TopicPartition, scala.Long]): java.util.Map[TopicPartition, java.lang.Long] =
    offsetQuery
      .map { case (tp, time) => tp -> new java.lang.Long(time) }
      .asJava

  val topic = "myInTopic"
  val timestamp: Long = 1595971151000L

  val props = new Properties()
  props.put("group.id", "group-id1337")
  props.put("bootstrap.servers", "localhost:9092")
  props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
  props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
  props.put("auto.offset.reset", "earliest")
  val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](props)

  val topicPartition = new TopicPartition(topic, 0)
  consumer.assign(java.util.Collections.singletonList(topicPartition))
  // dummy poll before calling seek
  consumer.poll(Duration.ofMillis(500))

  // get next available offset after given timestamp
  val (_, offsetAndTimestamp) = consumer.offsetsForTimes(Map(topicPartition -> timestamp)).asScala.head
  // seek to offset
  consumer.seek(topicPartition, offsetAndTimestamp.offset())

  // poll data
  val record = consumer.poll(Duration.ofMillis(500)).asScala.toList

  for (data <- record) {
    println(s"Timestamp: ${data.timestamp()}, Key: ${data.key()}, Value: ${data.value()}")
  }

}

Test

./kafka/current/bin/kafconsole-consumer.sh --bootstrap-server localhost:9092 --topic myInTopic --from-beginning --property print.value=true --property print.timestamp=true
CreateTime:1595971142560    1_old
CreateTime:1595971147697    2_old
CreateTime:1595971150136    3_old
CreateTime:1595971192649    1_new
CreateTime:1595971194489    2_new
CreateTime:1595971196416    3_new

Selecting the timestamp to a time between 3_old and 1_new to only consume the "new" messages.

Output

Timestamp: 1595971192649, Key: null, Value: 1_new
Timestamp: 1595971194489, Key: null, Value: 2_new
Timestamp: 1595971196416, Key: null, Value: 3_new

Upvotes: 3

OneCricketeer
OneCricketeer

Reputation: 191743

The only way to consume from any given timestamp is to

  1. Lookup offsetsForTimes
  2. seek to and commitSync that result
  3. Begin polling

But, you need to be conscious that the data stream is continuous and there may again be repeated keys later.


If you have the same key in data, that you would like to only see the latest of, then you'd be better off using a KTable

Upvotes: 3

Related Questions