Reputation: 601
Maybe this is a naive question but it somhow stuck me there for some time. Please bear with me.
I have a class DataConsumer.java that implements ConsumerAwareRebalanceListener
:
@Component
public class DataConsumer implements ConsumerAwareRebalanceListener {
@Override
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
// seek offsets based on a given timestamp
}
@KafkaListener(topics = "dataTopic", containerFactory = "kafkaListenerContainerFactory")
receive(ConsumerRecord payload) {}
}
So in order for onPartitionsAssigned
to work, i need to call setConsumerRebalanceListener
in the kafkaListenerContainerFactory
method which is defined in another class like this:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, ConsumerRecord> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, ConsumerRecord> factory = new ConcurrentKafkaListenerContainerFactory();
factory.getContainerProperties().setConsumerRebalanceListener(____________);
// rest part omitted
}
My question is about this ____________
part above. What shall i put there?
In my understanding, method kafkaListenerContainerFactory
is called when we initialize the @KafkaListener
container in DataConsumer class, so meaning there is already an existing DataConsumer instance to hold the @kafkaLister
. How can i pass that already existing DataConsumer instance to setConsumerRebalanceListener
function?
All the sample code snippets I can search out are like below:
setConsumerRebalanceListener(new ConsumerRebalanceListener() {
//override the functions
})
But isn't this creating a new instance? If I put new DataConsumer()
it will lose some status in the existing instance (e.g. the timestamp to seek offsets) so this can't work.
Upvotes: 1
Views: 1030
Reputation: 174759
You can declare DataConsumer
as a @Bean
(instead of using @Component
) then you can inject your bean there.
However, this is the wrong mechanism to use in this case.
Implement ConsumerSeekAware
instead - the container will automatically detect that your listener implements that and will call its onPartitionsAssigned
.
See Seeking to a Specific Offset in the documentation.
Upvotes: 1