Kafka listener, get all messages

Good day collegues. I have Kafka project using Spring Kafka what listen a definite topic. I need one time in a day listen all messages, put them into a collection and find specific message there. I couldn't understand how to read all messages in one @KafkaListener method. My class is:

@Component
public class KafkaIntervalListener {

    public CountDownLatch intervalLatch = new CountDownLatch(1);
    private final SCDFRunnerService scdfRunnerService;

    public KafkaIntervalListener(SCDFRunnerService scdfRunnerService) {
        this.scdfRunnerService = scdfRunnerService;
    }

    @KafkaListener(topics = "${kafka.interval-topic}", containerFactory = "intervalEventKafkaListenerContainerFactory")
    public void intervalListener(IntervalEvent event) throws UnsupportedEncodingException, JSONException {
        System.out.println("Recieved interval message: " + event);
        IntervalType type = event.getType();
        Instant instant = event.getInterval();
        List<IntervalEvent> events = new ArrayList<>();
        events.add(event);
        events.size();


        this.intervalLatch.countDown();
    }

}

My events collection always has size = 1; I tried to use different loops, but then, my collection become filed 530 000 000 times the same message.

UPDATE: I have found a way to do it with factory.setBatchListener(true); But i need to find launch it with @Scheduled(cron = "${kafka.cron}", zone = "Europe/Moscow"). Right now this method is always is listening. Now iam trying something like this:

    @Scheduled(cron = "${kafka.cron}", zone = "Europe/Moscow")
public void run() throws Exception {
    kafkaIntervalListener.intervalLatch.await();
}

It doesn't work, in debug mode my breakpoint never works on this site.

Upvotes: 1

Views: 3615

Answers (1)

Gary Russell
Gary Russell

Reputation: 174779

The listener container is, by design, message-driven.

For fetching messages on-demand, it's better to use the Kafka Consumer API directly and fetch messages using the poll() method.

Upvotes: 4

Related Questions