smlgbl
smlgbl

Reputation: 536

Is there a standard way to reset offsets when using the @KafkaListener annotation?

I'm using the KafkaListener Interface in my Spring Boot app, which works just fine. Offsets are stored by Kafka itself.

Now lets say one of the consumers of a topic deploys a new version and screws up 2 hours worth of messages. Then they fix the app and want to start the new version with an offset from two hours ago.

I could use the consumer.seek() with a previous consumer.offsetsForTimes() call, but that is only straighforward when I'm using the polling mechanism, not when using the Spring KafkaListener.

I tried using a ConsumerRebalanceListener, but to set the offsets I need the consumer to be active, which I don't have when instantiating the ConsumerRebalanceListener...

Do I need to switch to polling? Can I get at the consumer before my KafkaListener is invoked the first time, but after the consumer has been assigned partition(s)?

Upvotes: 3

Views: 1739

Answers (1)

Gary Russell
Gary Russell

Reputation: 174709

Not concurrently; the upcoming 2.0 release introduces ConsumerAwareMessageListener, which supports passing the Consumer as an argument into the @KafkaListener method.

Upvotes: 1

Related Questions