Craig Warren
Craig Warren

Reputation: 1665

Active MQ stops dispatching messages on large numbers of messages

I have a system build on Spring JMS and Active MQ (5.6) which has about 12 Spring Default Message Listener Containers (each up to 20 concurrent instances) all connected to the same active mq destination (queue).

The system works by each handler (container) taking messages from the queue addressed to itself, using a selector, doing its work and then putting the message back on the queue until all the work is complete.

I am doing a benchmarking test sending 25,000 messages which each need to pass through 9 different handlers.

Every time I run the test only around 11300 messages make it through all of my handlers, but active MQ sends no further messages.

At the end of my current test I can see the following stats for my queue: Enqueue Count: 120359 Dequeue Count: 106693 Dispatch Count: 106693 Inflight Count: 0 Queue Size: 13666

Active MQ dispatches no more messages unless I restart the broker.

Below is my active-mq configuration:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.5.0.xsd">

    <bean id="propertyConfigurer" class="org.springframework.web.context.support.ServletContextPropertyPlaceholderConfigurer" />
    <!-- The <broker> element is used to configure the ActiveMQ broker. -->
    <broker xmlns="http://activemq.apache.org/schema/core"
        brokerName="jmsDeployMqBroker" dataDirectory="${java.io.tmpdir}/activemq-data"
        destroyApplicationContextOnStop="true" useJmx="true">
        <destinationPolicy>
            <policyMap>
                <policyEntries>
                    <policyEntry topic=">" producerFlowControl="false">
                    </policyEntry>
                    <policyEntry queue=">" producerFlowControl="false">
                    </policyEntry>
                </policyEntries>
            </policyMap>
        </destinationPolicy>

        <destinations>
            <queue physicalName="handlersDest"/>
            <topic physicalName="notificationsDest"  />
            <queue physicalName="ActiveMQ.DLQ" />
        </destinations>

        <!-- The managementContext is used to configure how ActiveMQ is exposed 
            in JMX. By default, ActiveMQ uses the MBean server that is started by the 
            JVM. For more information, see: http://activemq.apache.org/jmx.html -->
        <managementContext>
            <managementContext createConnector="false" />
        </managementContext>

        <!-- Configure message persistence for the broker. The default persistence 
            mechanism is the KahaDB store (identified by the kahaDB tag). For more information, 
            see: http://activemq.apache.org/persistence.html -->
        <persistenceAdapter>
            <amq:kahaPersistenceAdapter directory="${java.io.tmpdir}/activemq-data/kahadb" maxDataFileLength="1g" />
        </persistenceAdapter>

        <!-- The transport connectors expose ActiveMQ over a given protocol to 
            clients and other brokers. For more information, see: http://activemq.apache.org/configuring-transports.html -->
        <transportConnectors>
            <transportConnector name="openwire" uri="${org.apache.activemq.brokerURL}" />
        </transportConnectors>
    </broker>
</beans>

And here is a sample of my spring config for a handler:

<jee:jndi-lookup id="connectionFactory" jndi-name="${jndi.jms.connfactory}">
        <jee:environment>
            java.naming.factory.initial = ${jndi.jms.naming.factory.initial}
            java.naming.provider.url = ${jndi.jms.naming.url}
        </jee:environment>
    </jee:jndi-lookup>

    <!-- ID must not change as it is used in autowiring the handlers -->
    <jee:jndi-lookup id="handlersDest"  jndi-name="${jndi.docprod.queue}">
        <jee:environment>
            java.naming.factory.initial = ${jndi.jms.naming.factory.initial}
            java.naming.provider.url = ${jndi.jms.naming.url}
            ${jndi.queue.setup}
        </jee:environment>
    </jee:jndi-lookup>

<!-- ID must not change as it is used in autowiring the handlers --> 
    <jee:jndi-lookup id="notificationsDest" jndi-name="${jndi.docprod.topic}">
            <jee:environment>
            java.naming.factory.initial = ${jndi.jms.naming.factory.initial}
            java.naming.provider.url = ${jndi.jms.naming.url}
            ${jndi.topic.setup}
        </jee:environment>
    </jee:jndi-lookup>

    <bean id="dmsReadContainer" class="mydomain.DocProdnMessageListenerContainer"
          p:connectionFactory-ref="connectionFactory"
          p:handlerClass="mydomain.DmsReadHandler"
          p:messageListener-ref="dmsReadHandler"
          p:destination-ref="handlersDest" >
          <property name="concurrentConsumers"><value>${dmsRead.initialInstances}</value></property>
          <property name="maxConcurrentConsumers"><value>${dmsRead.maxInstances}</value></property>
          <property name="idleConsumerLimit"><value>${dmsRead.idleInstances}</value></property>
          </bean>       

    <bean id="dmsReadHandler" class="mydomain.DmsReadHandler">

    </bean>
...

ActiveMQ's log file doesn't show anything out of the ordinary which would suggest why it stops dispatching.

Does anyone know why no further messages would be dispatched or have any suggestions to further diagnose the problem?

Upvotes: 4

Views: 3197

Answers (2)

Craig Warren
Craig Warren

Reputation: 1665

This appears to be problem in activemq.

There is a current Jira for it: https://issues.apache.org/jira/browse/AMQ-2745

Upvotes: 1

Ben ODay
Ben ODay

Reputation: 21005

I'd try the following to open things up a bit...

  1. increase the systemUsage from the defaults...

    <systemUsage>
      <systemUsage>
        <memoryUsage><memoryUsage limit="4 gb"/></memoryUsage> <!--75% of avail heap-->
        <storeUsage><storeUsage limit="10 gb"/></storeUsage>
        <tempUsage><tempUsage limit="10 gb"/></tempUsage>
      </systemUsage>
    </systemUsage> 
    
  2. use a JMS connection pool

    <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
       <property name="maxConnections" value="8" />
       <property name="connectionFactory" ref="jmsConnectionFactory" />
    </bean>
    
  3. switch to using kahadb like this...

    <persistenceAdapter>
        <kahaDB directory="activemq-data" journalMaxFileLength="32mb" enableJournalDiskSyncs="false"/>
    </persistenceAdapter>
    
  4. lower prefetch

    tcp://localhost:61616?jms.prefetchPolicy.all=10
    

Upvotes: 2

Related Questions