Bruno René Santos
Bruno René Santos

Reputation: 413

Spring Kafka - Subscribe new topics during runtime

I'm using the annotation @KafkaListener to consume topics on my application. My issue is that if I create a new topic in kafka but my consumer is already running, it seems the consumer will not pick up the new topic, even if it matches with the topicPattern I'm using. Is there a way to "refresh" the subscribed topics periodically, so that new topics are picked up and rebalanced upon my running consumers?

I'm using Spring Kafka 1.2.2 with Kafka 0.10.2.0.

Regards

Upvotes: 4

Views: 5847

Answers (3)

Sergey Tityenok
Sergey Tityenok

Reputation: 31

Actually it is possible.

It worked for me with Kafka 1.1.1.

Under the hood Spring uses consumer.subscribe(topicPattern) and now it is totally depends on Kafka lib whether the message will be seen by consumer.

There is consumer config property called metadata.max.age.ms which is 5 mins by default. It basically controls how often client will go to broker for the updates, meaning new topics will not be seen by consumer for up to 5 minutes. You can decrease this value (e.g. 20 seconds) and should see KafkaListener started to pick messages from new topics quicker.

Upvotes: 3

DanielJyc
DanielJyc

Reputation: 753

The following way works well for me.

ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
KafkaMessageListenerContainer<Integer, String> container = createContainer(containerProps);
containerProps.setMessageListener(new MessageListener<Integer, String>() {

    @Override
    public void onMessage(ConsumerRecord<Integer, String> message) {
        logger.info("received: " + message);
    }

});
container.setBeanName("testAuto");
container.start();

ref: http://docs.spring.io/spring-kafka/docs/1.0.0.RC1/reference/htmlsingle/

In practical application, I use a ConcurrentMessageListenerContainer instead of single-threaded KafkaMessageListenerContainer.

Upvotes: 1

Gary Russell
Gary Russell

Reputation: 174494

You can't dynamically add topics at runtime; you have to stop/start the container to start listening to new topics.

You can @Autowire the KafkaListenerEndpointRegistry and stop/start listeners by id.

You can also stop/start all listeners by calling stop()/start() on the registry itself.

Upvotes: 5

Related Questions