Reputation: 53
My application has multiple queues (queue names will be taken from database) and each of the queues will be consuming huge data daily. For this purpose, I need one container and message listener to be created per queue so that there will be a separate thread for each queue. In addition to this, there can be some queues getting created dynamically and I need a container to be assigned for newly created queues
My Consumer class is starting like below
// Below is the way by which my class is starting
@Component
public class RequestConsumer implements MessageListener {```
//and below is the code by which I am creating Message listner
@Bean
@Scope(value = "prototype")
public SimpleMessageListenerContainer simpleMessageListenerNotification(
ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer simpleMessageListenerContainer =
new SimpleMessageListenerContainer(connectionFactory);
RabbitAdmin rabbitAdmin = getRabbitAdmin(connectionFactory);
RequestConsumer RequestConsumer = (RequestConsumer) beanFactory.getBean("requestConsumer");
simpleMessageListenerContainer.setupMessageListener(RequestConsumer);
simpleMessageListenerContainer.setAutoDeclare(true);
for (String queueName : requestConsumerQueueList()) {
Queue queue = new Queue(queueName);
rabbitAdmin.declareQueue(queue);
simpleMessageListenerContainer.addQueues(queue);
}
simpleMessageListenerContainer.start();
return simpleMessageListenerContainer;
}
My current code is creating only one container with one messageListner for all the queues whereas I am expecting separate container for each queue.
Upvotes: 0
Views: 909
Reputation: 174729
First, you should not be declaring queues in a bean definition - it is too early in the context's lifecycle.
You should also not be calling start()
in the bean definition - again, too early.
You should do something like this:
@SpringBootApplication
public class So56951298Application {
public static void main(String[] args) {
SpringApplication.run(So56951298Application.class, args);
}
@Bean
public Declarables queues() {
return new Declarables(Arrays.asList(new Queue("q1"), new Queue("q2")));
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
Queue queue) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(queue);
container.setMessageListener(msg -> System.out.println(msg));
return container;
}
@Bean
public ApplicationRunner runner(ConnectionFactory connectionFactory, Declarables queues) {
return args -> {
queues.getDeclarables().forEach(dec -> container(connectionFactory, (Queue) dec).start());
};
}
}
The framework will automatically declare the queues at the right time (as long as there is a RabbitAdmin
in the application context (which Spring Boot automatically configures).
Upvotes: 3