Reputation: 29290
We have an ActiveMQ / Camel configuration that has previously been using exclusively message queues, with concurrent consumers.
However, we're now introducing message topics, and finding that - because of the concurrent consumers - messages received in the topic are consumed mulltiple times.
What's the correct configuration for this scenario?
ie., we want multiple concurrent consumers for messages received on a queue, but only a single consumer defined for messages received on a topic.
Here's the current configuration:
<amq:connectionFactory id="amqConnectionFactory"
useAsyncSend="true" brokerURL="${${ptl.Servername}.jms.cluster.uri}"
userName="${jms.username}" password="${jms.password}" sendTimeout="1000"
optimizeAcknowledge="true" disableTimeStampsByDefault="true">
</amq:connectionFactory>
<bean id="cachingConnectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
<property name="cacheConsumers" value="true"></property>
<property name="cacheProducers" value="true"></property>
<property name="reconnectOnException" value="true"></property>
<property name="sessionCacheSize" value="${jms.sessioncachesize}"></property>
</bean>
<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
<property name="connectionFactory" ref="cachingConnectionFactory" />
<property name="transacted" value="false" />
<property name="concurrentConsumers" value="${jms.concurrentConsumer}" />
<property name="maxConcurrentConsumers" value="${jms.max.concurrentConsumer}" />
<property name="preserveMessageQos" value="true" />
<property name="timeToLive" value="${jms.timeToLive}" />
</bean>
<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="configuration" ref="jmsConfig" />
</bean>
Upvotes: 2
Views: 5762
Reputation: 21015
you can explicitly set concurrentConsumers/maxConcurrentConsumers to "1" for any topic consumers.
from("activemq:topic:myTopic?concurrentConsumers=1&maxConcurrentConsumers=1")...
alternatively, set the JmsConfiguration concurrent/maxConcurrentConsumers properties to "1" and then explicitly enable concurrent consumption for queues as needed.
from("activemq:queue:myQueue?maxConcurrentConsumers=5")...
also, you can use Virtual Topics to perform concurrent consumption of Topic messages without getting duplicates (highly recommended over traditional Topics)
Upvotes: 3
Reputation: 29290
The solution I ended up using was to create a separate jmsConfig/activeMQ config block.
The total configration looks as follows:
<!-- This is appropriate for consuming Queues, but not topics. For topics, use
jmsTopicConfig / activemqTopics -->
<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
<property name="connectionFactory" ref="cachingConnectionFactory" />
<property name="transacted" value="false" />
<property name="concurrentConsumers" value="${jms.concurrentConsumer}" />
<property name="maxConcurrentConsumers" value="${jms.max.concurrentConsumer}" />
<property name="preserveMessageQos" value="true" />
<property name="timeToLive" value="${jms.timeToLive}" />
</bean>
<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="configuration" ref="jmsConfig" />
</bean>
<!-- This config limits to a single concurrent consumer. This config is appropriate for
consuming Topics, not Queues. -->
<bean id="jmsTopicConfig" class="org.apache.camel.component.jms.JmsConfiguration">
<property name="connectionFactory" ref="cachingConnectionFactory" />
<property name="transacted" value="false" />
<property name="concurrentConsumers" value="1" />
<property name="maxConcurrentConsumers" value="1" />
<property name="preserveMessageQos" value="true" />
<property name="timeToLive" value="${jms.timeToLive}" />
</bean>
<bean id="activemqTopics" class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="configuration" ref="jmsTopicConfig" />
</bean>
Then, in the camel pipeline, consuming the topic off the activemqTopics
bean, as follows:
<camel:route id="myTopicResponder">
<camel:from uri="activemqTopics:topic:stockQuotes?concurrentConsumers=1" />
<camel:to uri="bean:stockQuoteResponder?method=saveStockQuote"/>
</camel:route>
Upvotes: 0