Reputation: 413
I'm using the annotation @KafkaListener
to consume topics on my application. My issue is that if I create a new topic in kafka but my consumer is already running, it seems the consumer will not pick up the new topic, even if it matches with the topicPattern
I'm using. Is there a way to "refresh" the subscribed topics periodically, so that new topics are picked up and rebalanced upon my running consumers?
I'm using Spring Kafka 1.2.2 with Kafka 0.10.2.0.
Regards
Upvotes: 4
Views: 5847
Reputation: 31
Actually it is possible.
It worked for me with Kafka 1.1.1.
Under the hood Spring uses consumer.subscribe(topicPattern)
and now it is totally depends on Kafka lib whether the message will be seen by consumer.
There is consumer config property called metadata.max.age.ms
which is 5 mins by default. It basically controls how often client will go to broker for the updates, meaning new topics will not be seen by consumer for up to 5 minutes. You can decrease this value (e.g. 20 seconds) and should see KafkaListener started to pick messages from new topics quicker.
Upvotes: 3
Reputation: 753
The following way works well for me.
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
KafkaMessageListenerContainer<Integer, String> container = createContainer(containerProps);
containerProps.setMessageListener(new MessageListener<Integer, String>() {
@Override
public void onMessage(ConsumerRecord<Integer, String> message) {
logger.info("received: " + message);
}
});
container.setBeanName("testAuto");
container.start();
ref: http://docs.spring.io/spring-kafka/docs/1.0.0.RC1/reference/htmlsingle/
In practical application, I use a ConcurrentMessageListenerContainer
instead of single-threaded KafkaMessageListenerContainer
.
Upvotes: 1
Reputation: 174494
You can't dynamically add topics at runtime; you have to stop/start the container to start listening to new topics.
You can @Autowire
the KafkaListenerEndpointRegistry
and stop/start listeners by id
.
You can also stop/start all listeners by calling stop()
/start()
on the registry itself.
Upvotes: 5