Reputation: 1733
My application processes incoming messages that are placed on a single ActiveMQ queue (named "incoming.queue"). I have a MessageListener which processes the messages, and all is working well. My Java config is below:
@Configuration
@ComponentScan(basePackages="uk.co.domain")
public class JmsConfig {
@Bean
public ActiveMQConnectionFactory connectionFactory() {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
activeMQConnectionFactory.setBrokerURL("tcp://localhost:61616");
return activeMQConnectionFactory;
}
@Bean
public DefaultMessageListenerContainer jmsListenerContainer() {
DefaultMessageListenerContainer dmlc = new DefaultMessageListenerContainer();
dmlc.setConnectionFactory(connectionFactory());
dmlc.setDestination(new ActiveMQQueue("incoming.queue"));
dmlc.setMessageListener(new QueueProcessor());
dmlc.setConcurrentConsumers(50);
return dmlc;
}
}
public class QueueProcessor implements MessageListener {
public void onMessage(Message message) {
// process the message
}
}
Another department is making upstream changes such that the messages are going to be spread amongst three different queues, named "high_priority.queue", "med_priority.queue" and "low_priority.queue". The number of concurrent consumers for each queue needs to be 50, 20 and 5 respectively.
Within my code the same QueueProcessor
will be responsible for processing the messages, but I am unsure how to modify my config to create the three message listeners instead of one. Any advice is appreciated.
Upvotes: 1
Views: 5207
Reputation: 1733
I solved this problem by simply creating multiple beans - didn't think it would be this easy:
@Bean
public DefaultMessageListenerContainer highPriorityQueue() {
DefaultMessageListenerContainer dmlc = new DefaultMessageListenerContainer();
dmlc.setConnectionFactory(connectionFactory());
dmlc.setDestination(new ActiveMQQueue("high_priority.queue"));
dmlc.setMessageListener(new QueueProcessor());
dmlc.setConcurrentConsumers(50);
return dmlc;
}
@Bean
public DefaultMessageListenerContainer medPriorityQueue() {
DefaultMessageListenerContainer dmlc = new DefaultMessageListenerContainer();
dmlc.setConnectionFactory(connectionFactory());
dmlc.setDestination(new ActiveMQQueue("med_priority.queue"));
dmlc.setMessageListener(new QueueProcessor());
dmlc.setConcurrentConsumers(20);
return dmlc;
}
@Bean
public DefaultMessageListenerContainer lowPriorityQueue() {
DefaultMessageListenerContainer dmlc = new DefaultMessageListenerContainer();
dmlc.setConnectionFactory(connectionFactory());
dmlc.setDestination(new ActiveMQQueue("low_priority.queue"));
dmlc.setMessageListener(new QueueProcessor());
dmlc.setConcurrentConsumers(5);
return dmlc;
}
Upvotes: 2
Reputation: 37
you can create a generic processor class and set your connections, destinations and other configurations on this class with a constructor by giving queue name and consumer counts as inputs by processor clases which are implemented this generic class. also you can declare this clases constructors as beans for spring conf.
Upvotes: 0