Reputation: 9827
I'm using Spring JMS and ActiveMQ where I have a client that pushes messages to a Queue and I have multiple consumer threads that are listening and removing messages from the Queue. Some of the time the same messages gets dequeued from the Queue by two consumers. I don't want this behavior and want to ensure the only one messages is processed by only one consumer thread. Any ideas on where I've gone wrong?
Spring 3.2.2 Config:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:tx="http://www.springframework.org/schema/tx" xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.0.xsd">
<context:annotation-config />
<context:component-scan base-package="com.myapp" />
<!-- JMS ConnectionFactory config Starts -->
<bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL">
<value>${brokerURL}</value>
</property>
<property name="userName" value="${username}" />
<property name="password" value="${password}" />
</bean>
<bean id="pooledJmsConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
init-method="start" destroy-method="stop">
<property name="connectionFactory" ref="jmsConnectionFactory" />
</bean>
<!-- JMS ConnectionFactory config Ends -->
<!-- JMS Template config Starts -->
<bean id="myQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="${activemq.consumer.destinationName}" />
</bean>
<bean id="myQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="pooledJmsConnectionFactory" />
</bean>
<!-- JMS Template config Ends -->
<!-- JMS Listener config starts -->
<bean id="simpleMessageConverter"
class="org.springframework.jms.support.converter.SimpleMessageConverter" />
<bean id="myContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="concurrentConsumers" value="${threadcount}" />
<property name="connectionFactory" ref="pooledJmsConnectionFactory" />
<property name="destination" ref="myQueue" />
<property name="messageListener" ref="myListener" />
<property name="messageSelector" value="JMSType = 'New'" />
</bean>
<bean id="myListener"
class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<bean class="myapp.MessageListener" />
</constructor-arg>
<property name="defaultListenerMethod" value="receive" />
<property name="messageConverter" ref="simpleMessageConverter" />
</bean>
<!-- JMS Listener config Ends -->
<!-- enable the configuration of transactional behavior based on annotations -->
<bean id="myJMSMessageSender" class="myapp.JMSMessageSender">
<property name="jmsTemplate" ref="myQueueTemplate" />
<property name="jmsQueue" ref="myQueue" />
<property name="messageConverter" ref="simpleMessageConverter" />
</bean>
<bean id="myQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="pooledJmsConnectionFactory" />
</bean>
</beans>
ActiveMQ 5.9.1 config:
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="instance8161" dataDirectory="${activemq.data}" persistent="false">
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">">
<!-- The constantPendingMessageLimitStrategy is used to prevent
slow topic consumers to block producers and affect other consumers
by limiting the number of messages that are retained
For more information, see:
http://activemq.apache.org/slow-consumer-handling.html
-->
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="1000"/>
</pendingMessageLimitStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
... <!-- rest is default ActiveMQ Config -->
</broker>
Upvotes: 1
Views: 6238
Reputation: 174484
Most likely, your myapp.MessageListener
(or one of its dependencies) is not thread-safe and you are seeing cross-talk across the consumer threads.
Best practice is to craft your listener as stateless (no mutated fields in the class). If that's not possible, you need to protect shared variables with locks.
Upvotes: 3