Reputation: 455
I need to use ConsumerRebalanceListener and find that it's possible to register it by containerProperties.setConsumerRebalanceListener method. I need consumer instance in balancer listener for getting positions of partitions (consumer.position()) and for overriding the fetch offsets of consumer (consumer.seek()), but I couldn't find way how to have access to consumer instance.
Edited
Raised a GH issue, and here is the link https://github.com/spring-projects/spring-kafka/issues/304
Upvotes: 1
Views: 1947
Reputation: 121552
For that purpose you have to use ConsumerSeekAware
implementation from your target listener:
The first is called when the container is started; this callback should be used when seeking at some arbitrary time after initialization. You should save a reference to the callback; if you are using the same listener in multiple containers (or in a ConcurrentMessageListenerContainer) you should store the callback in a ThreadLocal or some other structure keyed by the listener Thread.
When using group management, the second method is called when assignments change. You can use this method, for example, for setting initial offsets for the partitions, by calling the callback; you must use the callback argument, not the one passed into registerSeekCallback. This method will never be called if you explicitly assign partitions yourself; use the TopicPartitionInitialOffset in that case.
Upvotes: 1