Reputation: 1524
kafka version : 0.9.0.1
If n = 20
,
I have to get last 20 messages of a topic.
I tried with
kafkaConsumer.seekToBeginning();
But it retrieves all the messages. I need to get only the last 20 messages.
This topic may have hundreds of thousands of records
public List<JSONObject> consumeMessages(String kafkaTopicName) {
KafkaConsumer<String, String> kafkaConsumer = null;
boolean flag = true;
List<JSONObject> messagesFromKafka = new ArrayList<>();
int recordCount = 0;
int i = 0;
int maxMessagesToReturn = 20;
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "project.group.id");
props.put("max.partition.fetch.bytes", "1048576000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaConsumer = new KafkaConsumer<>(props);
kafkaConsumer.subscribe(Arrays.asList(kafkaTopicName));
TopicPartition topicPartition = new TopicPartition(kafkaTopicName, 0);
LOGGER.info("Subscribed to topic " + kafkaConsumer.listTopics());
while (flag) {
// will consume all the messages and store in records
ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
kafkaConsumer.seekToBeginning(topicPartition);
// getting total records count
recordCount = records.count();
LOGGER.info("recordCount " + recordCount);
for (ConsumerRecord<String, String> record : records) {
if(record.value() != null) {
if (i >= recordCount - maxMessagesToReturn) {
// adding last 20 messages to messagesFromKafka
LOGGER.info("kafkaMessage "+record.value());
messagesFromKafka.add(new JSONObject(record.value()));
}
i++;
}
}
if (recordCount > 0) {
flag = false;
}
}
kafkaConsumer.close();
return messagesFromKafka;
}
Upvotes: 3
Views: 17794
Reputation: 3412
You can use kafkaConsumer.seekToEnd(Collection<TopicPartition> partitions)
to seek to the last offset of the given partition(s). As per the documentation:
"Seek to the last offset for each of the given partitions. This function evaluates lazily, seeking to the final offset in all partitions only when
poll(Duration)
orposition(TopicPartition)
are called. If no partitions are provided, seek to the final offset for all of the currently assigned partitions."
Then you can retrieve the position of a particular partition using position(TopicPartition partition)
.
Then you can reduce 20 from it, and use kafkaConsumer.seek(TopicPartition partition, long offset)
to get to the most recent 20 messages.
Simply,
kafkaConsumer.seekToEnd(partitionList);
long endPosition = kafkaConsumer.position(topicPartiton);
long recentMessagesStartPosition = endPosition - maxMessagesToReturn;
kafkaConsumer.seek(topicPartition, recentMessagesStartPosition);
Now you can retrieve the most recent 20 messages using poll()
This is the simple logic, but if you have multiple partitions, you have to consider those cases as well. I did not try this, but hope you'll get the concept.
Upvotes: 6