Kailas Andhale
Kailas Andhale

Reputation: 53

dynamic container creation in spring-rabbitmq per queue

My application has multiple queues (queue names will be taken from database) and each of the queues will be consuming huge data daily. For this purpose, I need one container and message listener to be created per queue so that there will be a separate thread for each queue. In addition to this, there can be some queues getting created dynamically and I need a container to be assigned for newly created queues

My Consumer class is starting like below

// Below is the way by which my class is starting

@Component
public class RequestConsumer implements MessageListener {```
//and below is the code by which I am creating Message listner
@Bean
    @Scope(value = "prototype")
    public SimpleMessageListenerContainer simpleMessageListenerNotification(
            ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer simpleMessageListenerContainer =
                new SimpleMessageListenerContainer(connectionFactory);
        RabbitAdmin rabbitAdmin = getRabbitAdmin(connectionFactory);
        RequestConsumer RequestConsumer = (RequestConsumer) beanFactory.getBean("requestConsumer");
        simpleMessageListenerContainer.setupMessageListener(RequestConsumer);
        simpleMessageListenerContainer.setAutoDeclare(true);
        for (String queueName : requestConsumerQueueList()) {
            Queue queue = new Queue(queueName);
            rabbitAdmin.declareQueue(queue);
            simpleMessageListenerContainer.addQueues(queue);
        }
        simpleMessageListenerContainer.start();
        return simpleMessageListenerContainer;
    }

My current code is creating only one container with one messageListner for all the queues whereas I am expecting separate container for each queue.

Upvotes: 0

Views: 909

Answers (1)

Gary Russell
Gary Russell

Reputation: 174729

First, you should not be declaring queues in a bean definition - it is too early in the context's lifecycle.

You should also not be calling start() in the bean definition - again, too early.

You should do something like this:

@SpringBootApplication
public class So56951298Application {

    public static void main(String[] args) {
        SpringApplication.run(So56951298Application.class, args);
    }

    @Bean
    public Declarables queues() {
        return new Declarables(Arrays.asList(new Queue("q1"), new Queue("q2")));
    }

    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
            Queue queue) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setQueues(queue);
        container.setMessageListener(msg -> System.out.println(msg));
        return container;
    }

    @Bean
    public ApplicationRunner runner(ConnectionFactory connectionFactory, Declarables queues) {
        return args -> {
            queues.getDeclarables().forEach(dec -> container(connectionFactory, (Queue) dec).start());
        };
    }

}

The framework will automatically declare the queues at the right time (as long as there is a RabbitAdmin in the application context (which Spring Boot automatically configures).

Upvotes: 3

Related Questions