Reputation: 5243
I am using Spring Boot's @KafkaListener to monitor some server's heartbeat messages as:
@KafkaListener(topics = "heartbeat-topic", groupId = "monitor")
public void listenToHeartbeatMsg(String message) {}
Issue is when the listener subscriber application started, even though the server is down, subscriber application will be receiving those previous heartbeat messages.
How can I fix this issue and listen only for real-time heartbeat messages?
Upvotes: 0
Views: 988
Reputation: 174779
Implement ConsumerSeekAware
and, in onPartitionsAssigned
call seekToBeginning on the callback.
See https://docs.spring.io/spring-kafka/docs/current/reference/html/#seek
public class MyListener implements ConsumerSeekAware {
...
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
callback.seekToEnd(assignments.keySet());
}
}
Upvotes: 1