praveen kumar
praveen kumar

Reputation: 1524

Retrieve last n messages of Kafka consumer from a particular topic

kafka version : 0.9.0.1

If n = 20, I have to get last 20 messages of a topic.

I tried with

kafkaConsumer.seekToBeginning();

But it retrieves all the messages. I need to get only the last 20 messages.

This topic may have hundreds of thousands of records

public List<JSONObject> consumeMessages(String kafkaTopicName) {
  KafkaConsumer<String, String> kafkaConsumer = null;
  boolean flag = true;
  List<JSONObject> messagesFromKafka = new ArrayList<>();
  int recordCount = 0;
  int i = 0;
  int maxMessagesToReturn = 20;

  Properties props = new Properties();         
  props.put("bootstrap.servers", "localhost:9092");
  props.put("group.id", "project.group.id");
  props.put("max.partition.fetch.bytes", "1048576000");
  props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  kafkaConsumer = new KafkaConsumer<>(props);

  kafkaConsumer.subscribe(Arrays.asList(kafkaTopicName));
  TopicPartition topicPartition = new TopicPartition(kafkaTopicName, 0);
  LOGGER.info("Subscribed to topic " + kafkaConsumer.listTopics());
  while (flag) {
    // will consume all the messages and store in records
    ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
    kafkaConsumer.seekToBeginning(topicPartition);

    // getting total records count
    recordCount = records.count();
    LOGGER.info("recordCount " + recordCount);
    for (ConsumerRecord<String, String> record : records) {
      if(record.value() != null) {
        if (i >= recordCount - maxMessagesToReturn) {
          // adding last 20 messages to messagesFromKafka
          LOGGER.info("kafkaMessage "+record.value());
          messagesFromKafka.add(new JSONObject(record.value()));
        }
        i++;
      }
    }
    if (recordCount > 0) {
      flag = false;
    }
  }
  kafkaConsumer.close();
  return messagesFromKafka;
}

Upvotes: 3

Views: 17794

Answers (1)

ThisaruG
ThisaruG

Reputation: 3412

You can use kafkaConsumer.seekToEnd(Collection<TopicPartition> partitions) to seek to the last offset of the given partition(s). As per the documentation:

"Seek to the last offset for each of the given partitions. This function evaluates lazily, seeking to the final offset in all partitions only when poll(Duration) or position(TopicPartition) are called. If no partitions are provided, seek to the final offset for all of the currently assigned partitions."

Then you can retrieve the position of a particular partition using position(TopicPartition partition).

Then you can reduce 20 from it, and use kafkaConsumer.seek(TopicPartition partition, long offset) to get to the most recent 20 messages.

Simply,

kafkaConsumer.seekToEnd(partitionList);
long endPosition = kafkaConsumer.position(topicPartiton);
long recentMessagesStartPosition = endPosition - maxMessagesToReturn;
kafkaConsumer.seek(topicPartition, recentMessagesStartPosition);

Now you can retrieve the most recent 20 messages using poll()

This is the simple logic, but if you have multiple partitions, you have to consider those cases as well. I did not try this, but hope you'll get the concept.

Upvotes: 6

Related Questions