smilin_stan
smilin_stan

Reputation: 1733

Consume multiple ActiveMQ queues via the same ActiveMQConnectionFactory

My application processes incoming messages that are placed on a single ActiveMQ queue (named "incoming.queue"). I have a MessageListener which processes the messages, and all is working well. My Java config is below:

@Configuration
@ComponentScan(basePackages="uk.co.domain")
public class JmsConfig {

    @Bean
    public ActiveMQConnectionFactory connectionFactory() {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
        activeMQConnectionFactory.setBrokerURL("tcp://localhost:61616");
        return activeMQConnectionFactory;
    }

    @Bean
    public DefaultMessageListenerContainer jmsListenerContainer() {
        DefaultMessageListenerContainer dmlc = new DefaultMessageListenerContainer();
        dmlc.setConnectionFactory(connectionFactory());
        dmlc.setDestination(new ActiveMQQueue("incoming.queue"));
        dmlc.setMessageListener(new QueueProcessor());
        dmlc.setConcurrentConsumers(50);
        return dmlc;
    }
}



public class QueueProcessor implements MessageListener {

    public void onMessage(Message message) {

        // process the message
    }
}

Another department is making upstream changes such that the messages are going to be spread amongst three different queues, named "high_priority.queue", "med_priority.queue" and "low_priority.queue". The number of concurrent consumers for each queue needs to be 50, 20 and 5 respectively.

Within my code the same QueueProcessor will be responsible for processing the messages, but I am unsure how to modify my config to create the three message listeners instead of one. Any advice is appreciated.

Upvotes: 1

Views: 5207

Answers (2)

smilin_stan
smilin_stan

Reputation: 1733

I solved this problem by simply creating multiple beans - didn't think it would be this easy:

@Bean
public DefaultMessageListenerContainer highPriorityQueue() {
    DefaultMessageListenerContainer dmlc = new DefaultMessageListenerContainer();
    dmlc.setConnectionFactory(connectionFactory());
    dmlc.setDestination(new ActiveMQQueue("high_priority.queue"));
    dmlc.setMessageListener(new QueueProcessor());
    dmlc.setConcurrentConsumers(50);
    return dmlc;
}

@Bean
public DefaultMessageListenerContainer medPriorityQueue() {
    DefaultMessageListenerContainer dmlc = new DefaultMessageListenerContainer();
    dmlc.setConnectionFactory(connectionFactory());
    dmlc.setDestination(new ActiveMQQueue("med_priority.queue"));
    dmlc.setMessageListener(new QueueProcessor());
    dmlc.setConcurrentConsumers(20);
    return dmlc;
}

@Bean
public DefaultMessageListenerContainer lowPriorityQueue() {
    DefaultMessageListenerContainer dmlc = new DefaultMessageListenerContainer();
    dmlc.setConnectionFactory(connectionFactory());
    dmlc.setDestination(new ActiveMQQueue("low_priority.queue"));
    dmlc.setMessageListener(new QueueProcessor());
    dmlc.setConcurrentConsumers(5);
    return dmlc;
}

Upvotes: 2

erdem karayer
erdem karayer

Reputation: 37

you can create a generic processor class and set your connections, destinations and other configurations on this class with a constructor by giving queue name and consumer counts as inputs by processor clases which are implemented this generic class. also you can declare this clases constructors as beans for spring conf.

Upvotes: 0

Related Questions