user3665549
user3665549

Reputation: 43

How to acknowledge specific offset on container.stop in Spring Kafka?

Basically there is opened WebSocket session for logged-in user id = 123 and listener container have been created and started and records from topic notification.user.123 are sent to the client but acknowledgment.acknowledge(); is never really called.

The latest viewed notification and related offset are known. I do not want to store and manage that data and seek for offset. When it is time to close the WebSocket session and stop the listener container I need to acknowledge it. So it will be stored somewhere in broker as the latest offset readed by consumer. And new WebSocket session and listener container will start from that offset naturally.

So far I have tried improvise with no luck with https://docs.spring.io/spring-kafka/reference/html/#seek and

private class MyListener extends AbstractConsumerSeekAware implements AcknowledgingConsumerAwareMessageListener<String, Notification> {

Do I need to look at some listener container life cycle and atStop event?

Upvotes: 0

Views: 483

Answers (1)

Gary Russell
Gary Russell

Reputation: 174554

If you must use manual acks for some reason, since you are receiving all the topic/partition/offset info you can commit the offsets on the Consumer by capturing the ConsumerStoppoingEvents.

The suggested solution is working perfectly with @KafkaListener and @EventListener but there are no events for programmatically created and started KafkaMessageListenerContainer.

To get events to an event listener when Spring doesn't manage the container, call setApplicationEventPublisher(applictionContext) (the application publisher is the application context when an ApplicationEventListenerAware bean is managed by Spring).

Or you can simply inject your own ApplicationEventPublisher implementation and deal with the event(s) there.

Upvotes: 1

Related Questions