Reputation: 536
I'm using the KafkaListener Interface in my Spring Boot app, which works just fine. Offsets are stored by Kafka itself.
Now lets say one of the consumers of a topic deploys a new version and screws up 2 hours worth of messages. Then they fix the app and want to start the new version with an offset from two hours ago.
I could use the consumer.seek() with a previous consumer.offsetsForTimes() call, but that is only straighforward when I'm using the polling mechanism, not when using the Spring KafkaListener.
I tried using a ConsumerRebalanceListener, but to set the offsets I need the consumer to be active, which I don't have when instantiating the ConsumerRebalanceListener...
Do I need to switch to polling? Can I get at the consumer before my KafkaListener is invoked the first time, but after the consumer has been assigned partition(s)?
Upvotes: 3
Views: 1739
Reputation: 174709
Not concurrently; the upcoming 2.0 release introduces ConsumerAwareMessageListener
, which supports passing the Consumer
as an argument into the @KafkaListener
method.
Upvotes: 1