Vikram
Vikram

Reputation: 655

Consuming messages in parallel from ActiveMQ

Whenever I am posting a message to the queue the first time the message gets picked up without any issue, but when I drop the second file the message is in the "pending" state the thread sleeping time (2 minutes). To test the Concurrency working in ActiveMQ I have added the bean called ThreadService.

I have the code like below in the JMSConfig.java

@Bean
public ActiveMQConnectionFactory connectionFactory() {
    ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
    connectionFactory.setBrokerURL("tcp://localhost:61616");
    connectionFactory.setPassword("admin");
    connectionFactory.setUserName("admin");
    connectionFactory.setTrustedPackages(Arrays.asList("com.jms.domain", "java.util"));
    connectionFactory.setMaxThreadPoolSize(1);
    return connectionFactory;
}

@Bean(destroyMethod = "stop", initMethod = "start")
@Primary
public PooledConnectionFactory pooledConnectionFactory(ConnectionFactory connectionFactory) {
    PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
    pooledConnectionFactory.setConnectionFactory(connectionFactory);
    pooledConnectionFactory.setMaxConnections("8");
    pooledConnectionFactory.setMaximumActiveSessionPerConnection("10");
    return pooledConnectionFactory;
}

@Bean
public JmsListenerContainerFactory<?> queueListenerFactory(ConnectionFactory connectionFactory, DefaultJmsListenerContainerFactoryConfigurer configurer) {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    configurer.configure(factory, connectionFactory);
    factory.setConcurrency("1-5");
    return factory;
}

CamelRouter.java

from("file://E:/Camel")
    .bean(ThreadService)        
    .to("activemq:MessageQueue");

ThreadService.java

public void ThreadService throws Exception {
    Thread.sleep(120000);
}

How can I achieve concurrency in ActiveMQ which dequeues message in pending state in parallel?

enter image description here

Upvotes: 2

Views: 3476

Answers (1)

burki
burki

Reputation: 7025

I am confused because your question subject is about consuming and your route is producing to ActiveMQ

Parallel consumers

If you want to consume in parallel from a JMS queue, you normally configure multiple consumers.

If you want to do this for an individual consumer, you can append it to the endpoint URI

from("activemq:queue:myQueue?concurrentConsumers=5"

if you want to apply this as default for every consumer, you can configure it in your bean setup

@Bean
public JmsConfiguration jmsConfiguration() {
    JmsConfiguration jmsConfiguration = new JmsConfiguration();
    jmsConfiguration.setConnectionFactory(pooledConnectionFactory());
    jmsConfiguration.setConcurrentConsumers(5);
    return jmsConfiguration;
}

@Bean(name = "activemq")
public ActiveMQComponent activeMq() {
    ActiveMQComponent activeMQComponent = new ActiveMQComponent();
    activeMQComponent.setConfiguration(jmsConfiguration()); 
    return activeMQComponent;
}

Parallel producers

Well, your JMS producing route has a file consumer that is per definition single threaded to avoid processing the same file with multiple consumers.

However, you can turn your route multithreaded after file consumption with the Threads DSL of Camel

from("file://E:/Camel")
    .threads(5) // continue asynchronous from here with 5 threads
    .bean(ThreadService)        
    .to("activemq:MessageQueue");

Like this your "long running task" in ThreadService should no more block other files because the route continues asynchronous with 5 threads from the threads statement. The file consumer stays single threaded.

But be aware! The threads statement breaks the current transaction. The file consumer hands the message over to a new thread. If an error occurs later, the file consumer does not see it.

Upvotes: 2

Related Questions