Ben Mathews
Ben Mathews

Reputation: 2998

How to concurrently process spring integration jms channels

I've got a JMS connectionFactory that a number of spring integration jms inbound-gateways use. They work fine, but only one message at a time. I'd like to make them handle N concurrent messages at a time in different threads.

The code I've got now is as follows.

spring-config.xml

<import resource="commons/jmsConnectionFactory.xml"/>
<import resource="chain/chain1.xml"/>
<import resource="chain/chain2.xml"/>

jmsConnectionFactory.xml

<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL">
        <value>failover:(tcp://mqmaster:61616,tcp://mqslave:61616)?jms.prefetchPolicy.all=1&amp;randomize=false</value>
    </property>
</bean>

chains all look something like this

<int:channel id="fooChannel"/>
<int-jms:inbound-gateway request-channel="fooChannel" request-destination-name="foo" extract-request-payload="true" />
<int:chain input-channel="fooChannel">
  <int-http:outbound-gateway
     url="...."
     http-method="GET" 
     extract-request-payload="true" />
  <int:object-to-string-transformer />
</int:chain>

I know that I can add "concurrent-consumers" and "max-concurrent-consumers" to the inbound-gateway to make the gateway process multiple messages. That would result though in having each chain/gateway handling it's threads managed independently. I would like some way to define a common thread pool for everyone using the JMS connection. This would allow me to specify 5 threads and have them allocated among the gateways based on what messages the server is supplying, but constraining the consuming server to a manageable amount of work.

What modifications are needed to do the processing with multiple threads? And how do I limit the number of threads?

Upvotes: 2

Views: 6122

Answers (2)

Ben Mathews
Ben Mathews

Reputation: 2998

As hoaz suggested, a taskExecutor is the solution.

<task:executor id="taskExecutor" pool-size="4"/>

and

<int:channel id="fooChannel">
  <int:dispatcher task-executor="taskExecutor"/>
</int:channel>

Upvotes: 1

hoaz
hoaz

Reputation: 10143

Inbound Gateway is an extension of Message-Driven Channel Adapter so you can use all properties DefaultMessageListenerContainer provides. One of them defines number of concurrent consumers container uses:

<int-jms:inbound-gateway
    request-channel="fooChannel"
    request-destination-name="foo"
    extract-request-payload="true"
    concurrent-consumers="10" />

You can find more information here.

Update: It appears you can specify reference to your container implementation in inbound-gateway using container attribute. That said you can reuse one instance of task executor between your containers and gateways:

<int-jms:inbound-gateway
    request-channel="fooChannel"
    request-destination-name="foo"
    extract-request-payload="true"
    container="fooChannelContainer" />

<bean id="fooChannelContainer" class="org.springframework.jms.listener.SimpleMessageListenerContainer" destroy-method="destroy">
    ...
    <property name="taskExecutor" ref="jmsTaskExecutor" />
</bean>

Upvotes: 1

Related Questions