Reputation: 505
Good day collegues. I have Kafka project using Spring Kafka what listen a definite topic. I need one time in a day listen all messages, put them into a collection and find specific message there. I couldn't understand how to read all messages in one @KafkaListener method. My class is:
@Component
public class KafkaIntervalListener {
public CountDownLatch intervalLatch = new CountDownLatch(1);
private final SCDFRunnerService scdfRunnerService;
public KafkaIntervalListener(SCDFRunnerService scdfRunnerService) {
this.scdfRunnerService = scdfRunnerService;
}
@KafkaListener(topics = "${kafka.interval-topic}", containerFactory = "intervalEventKafkaListenerContainerFactory")
public void intervalListener(IntervalEvent event) throws UnsupportedEncodingException, JSONException {
System.out.println("Recieved interval message: " + event);
IntervalType type = event.getType();
Instant instant = event.getInterval();
List<IntervalEvent> events = new ArrayList<>();
events.add(event);
events.size();
this.intervalLatch.countDown();
}
}
My events collection always has size = 1; I tried to use different loops, but then, my collection become filed 530 000 000 times the same message.
UPDATE: I have found a way to do it with factory.setBatchListener(true); But i need to find launch it with @Scheduled(cron = "${kafka.cron}", zone = "Europe/Moscow"). Right now this method is always is listening. Now iam trying something like this:
@Scheduled(cron = "${kafka.cron}", zone = "Europe/Moscow")
public void run() throws Exception {
kafkaIntervalListener.intervalLatch.await();
}
It doesn't work, in debug mode my breakpoint never works on this site.
Upvotes: 1
Views: 3615
Reputation: 174779
The listener container is, by design, message-driven.
For fetching messages on-demand, it's better to use the Kafka Consumer
API directly and fetch messages using the poll()
method.
Upvotes: 4