c12
c12

Reputation: 9827

ActiveMQ Concurrency Issue - Multiple Consumers Consuming the Same Message From Queue

I'm using Spring JMS and ActiveMQ where I have a client that pushes messages to a Queue and I have multiple consumer threads that are listening and removing messages from the Queue. Some of the time the same messages gets dequeued from the Queue by two consumers. I don't want this behavior and want to ensure the only one messages is processed by only one consumer thread. Any ideas on where I've gone wrong?

Spring 3.2.2 Config:

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:tx="http://www.springframework.org/schema/tx" xmlns:util="http://www.springframework.org/schema/util"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans     
        http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
        http://www.springframework.org/schema/context 
        http://www.springframework.org/schema/context/spring-context-3.0.xsd
        http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsd
        http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.0.xsd">
    <context:annotation-config />
    <context:component-scan base-package="com.myapp" />

    <!-- JMS ConnectionFactory config Starts -->
    <bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <property name="brokerURL">
            <value>${brokerURL}</value>
        </property>
        <property name="userName" value="${username}" />
        <property name="password" value="${password}" />
    </bean>

    <bean id="pooledJmsConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
        init-method="start" destroy-method="stop">
        <property name="connectionFactory" ref="jmsConnectionFactory" />
    </bean>
    <!-- JMS ConnectionFactory config Ends -->

    <!-- JMS Template config Starts -->
    <bean id="myQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg value="${activemq.consumer.destinationName}" />
    </bean>

    <bean id="myQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="pooledJmsConnectionFactory" />
    </bean>
    <!-- JMS Template config Ends -->

    <!-- JMS Listener config starts -->
    <bean id="simpleMessageConverter"
        class="org.springframework.jms.support.converter.SimpleMessageConverter" />

    <bean id="myContainer" 
        class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="concurrentConsumers" value="${threadcount}" />
        <property name="connectionFactory" ref="pooledJmsConnectionFactory" />
        <property name="destination" ref="myQueue" />
        <property name="messageListener" ref="myListener" />
        <property name="messageSelector" value="JMSType = 'New'" />
    </bean>

    <bean id="myListener"
        class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
        <constructor-arg>
            <bean class="myapp.MessageListener" />
        </constructor-arg>
        <property name="defaultListenerMethod" value="receive" />
        <property name="messageConverter" ref="simpleMessageConverter" />
    </bean>
    <!-- JMS Listener config Ends -->


    <!-- enable the configuration of transactional behavior based on annotations -->
    <bean id="myJMSMessageSender" class="myapp.JMSMessageSender">
        <property name="jmsTemplate" ref="myQueueTemplate" />
        <property name="jmsQueue" ref="myQueue" />
        <property name="messageConverter" ref="simpleMessageConverter" />
    </bean>


    <bean id="myQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="pooledJmsConnectionFactory" />
    </bean>

</beans>

ActiveMQ 5.9.1 config:

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="instance8161" dataDirectory="${activemq.data}" persistent="false">

        <destinationPolicy>
            <policyMap>
              <policyEntries>
                <policyEntry topic="&gt;">
                    <!-- The constantPendingMessageLimitStrategy is used to prevent
                         slow topic consumers to block producers and affect other consumers
                         by limiting the number of messages that are retained
                         For more information, see:

                         http://activemq.apache.org/slow-consumer-handling.html

                    -->
                  <pendingMessageLimitStrategy>
                    <constantPendingMessageLimitStrategy limit="1000"/>
                  </pendingMessageLimitStrategy>
                </policyEntry>
              </policyEntries>
            </policyMap>
        </destinationPolicy>

        ... <!-- rest is default ActiveMQ Config -->
</broker>

Upvotes: 1

Views: 6238

Answers (1)

Gary Russell
Gary Russell

Reputation: 174484

Most likely, your myapp.MessageListener (or one of its dependencies) is not thread-safe and you are seeing cross-talk across the consumer threads.

Best practice is to craft your listener as stateless (no mutated fields in the class). If that's not possible, you need to protect shared variables with locks.

Upvotes: 3

Related Questions