Reputation: 43
Basically there is opened WebSocket session for logged-in user id = 123 and listener container have been created and started and records from topic notification.user.123 are sent to the client but acknowledgment.acknowledge();
is never really called.
The latest viewed notification and related offset are known. I do not want to store and manage that data and seek for offset. When it is time to close the WebSocket session and stop the listener container I need to acknowledge it. So it will be stored somewhere in broker as the latest offset readed by consumer. And new WebSocket session and listener container will start from that offset naturally.
So far I have tried improvise with no luck with https://docs.spring.io/spring-kafka/reference/html/#seek and
private class MyListener extends AbstractConsumerSeekAware implements AcknowledgingConsumerAwareMessageListener<String, Notification> {
Do I need to look at some listener container life cycle and atStop event?
Upvotes: 0
Views: 483
Reputation: 174554
If you must use manual acks for some reason, since you are receiving all the topic/partition/offset info you can commit the offsets on the Consumer by capturing the ConsumerStoppoingEvent
s.
The suggested solution is working perfectly with
@KafkaListener
and@EventListener
but there are no events for programmatically created and started KafkaMessageListenerContainer.
To get events to an event listener when Spring doesn't manage the container, call setApplicationEventPublisher(applictionContext)
(the application publisher is the application context when an ApplicationEventListenerAware
bean is managed by Spring).
Or you can simply inject your own ApplicationEventPublisher
implementation and deal with the event(s) there.
Upvotes: 1