AJK1305
AJK1305

Reputation: 129

How to read a message from Kafka topic on demand

How can I read a message from Kafka topic on demand. I have the topic name, offsetId, PartitionID, using these three params, how can i retrieve a specific message from Kafka Topic. Is it possible using Spring Kafka ? I am using spring boot 2.2.4.RELEASE

Upvotes: 1

Views: 4644

Answers (1)

Gary Russell
Gary Russell

Reputation: 174494

  • create consumer
  • assign the topic/partition
  • seek
  • poll for one record
  • close consumer
@SpringBootApplication
public class So64759726Application {

    public static void main(String[] args) {
        SpringApplication.run(So64759726Application.class, args);
    }

    @Bean
    ApplicationRunner runner(ConsumerFactory<String, String> cf) {
        return args -> {
            try (Consumer<String, String> consumer = cf.createConsumer()) {
                TopicPartition tp = new TopicPartition("so64759726", 0);
                consumer.assign(Collections.singleton(tp));
                consumer.seek(tp, 2);
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
                System.out.println(records.iterator().next().value());
            }
        };
    }

}

application.properties

spring.kafka.consumer.max-poll-records=1

UPDATE

Since this answer was posted, the KafkaTemplate now has receive() methods for on-demand consumption.

https://docs.spring.io/spring-kafka/docs/current/reference/html/#kafka-template-receive

ConsumerRecord<K, V> receive(String topic, int partition, long offset);

ConsumerRecord<K, V> receive(String topic, int partition, long offset, Duration pollTimeout);

ConsumerRecords<K, V> receive(Collection<TopicPartitionOffset> requested);

ConsumerRecords<K, V> receive(Collection<TopicPartitionOffset> requested, Duration pollTimeout);

Upvotes: 4

Related Questions