user2001388
user2001388

Reputation: 21

ActiveMQ.Advisory.Consumer.Queue topic flood using Spring, Camel, and Atomikos

I'm wondering if anyone has run into this issue when using the combination of Atomikos + Camel + ActiveMQ Classic. I am using this combo to peel off messages from a queue in a transacted fashion. It seems to work well.

The problem is that I am now in a situation where I need to turn on advisory messages in ActiveMQ. After I did that, I noticed that all the queues are constantly having connections recreated. This is evidenced by the flooding of the ActiveMQ.Advisory.Consumer.Queue topics. It is also apparent in the DEBUG logging, as it continually creates a connection, opens a transaction, commits it, and closes the connection. This happens without any actual application generated messages. All the other non-transacted queues/topics do not have this issue. I have read in a few other posts that connection pooling and caching can alleviate this issue. It appears that I shouldn't use caching and that I'm already connection pooling. I am using this config:

<bean id="txq" class="org.apache.activemq.camel.component.ActiveMQComponent">
    <property name="configuration" ref="txJmsConfig" />
</bean>
<bean id="txJmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
    <property name="connectionFactory" ref="atomikosConnectionFactory" />
    <property name="concurrentConsumers" value="1" />
    <property name="transacted" value="true" />
    <property name="maxConcurrentConsumers" value="${consumers.concurrent.max}" />
    <property name="transactionManager" ref="jtaTransactionManager" />
    <property name="cacheLevelName" value="CACHE_NONE" />
</bean>
<bean id="atomikosConnectionFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean"
    init-method="init" destroy-method="close">
    <property name="uniqueResourceName">
        <value>XA-JMS-ATOMIKOS</value>
    </property>
    <property name="localTransactionMode">
        <value>false</value>
    </property>
    <property name="poolSize">
        <value>4</value>
    </property>
    <property name="xaConnectionFactory">
        <ref bean="xaJmsConnectionFactory" />
    </property>
</bean>
<bean id="xaJmsConnectionFactory" class="org.apache.activemq.ActiveMQXAConnectionFactory">
    <property name="brokerURL"
        value="${queue.address}?jms.watchTopicAdvisories=false&amp;jms.prefetchPolicy.all=0" />
</bean>
<bean id="userTransactionService" class="com.atomikos.icatch.config.UserTransactionServiceImp"
    init-method="init" destroy-method="shutdownForce">
    <constructor-arg>
        <props>
            <prop key="com.atomikos.icatch.service">
                com.atomikos.icatch.standalone.UserTransactionServiceFactory
            </prop>
            <prop key="com.atomikos.icatch.max_actives">${batch.transactions.concurrent.max}</prop>
        </props>
    </constructor-arg>
</bean>
<bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager"
    init-method="init" destroy-method="close" depends-on="userTransactionService">
    <property name="forceShutdown" value="false" />
</bean>
<bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp">
    <property name="transactionTimeout" value="300" />
</bean>
<bean id="jtaTransactionManager"
    class="org.springframework.transaction.jta.JtaTransactionManager">
    <property name="transactionManager" ref="atomikosTransactionManager" />
    <property name="userTransaction" ref="atomikosUserTransaction" />
</bean>

It uses the AtomikosConnectionFactoryBean which I thought implemented pooling. Maybe I'm wrong on that? I'd love to hear if anyone else is in this boat with me, and what they did to fix it.

@PeterSmith suggested implementation

Petter, thanks for your suggestion. I changed my config to use the XaPooledConnectionFactory. Spring is not pleased with this. It thinks that XaPooledConnectionFactory does not implement XAConnectionFactory.

<bean id="atomikosConnectionFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean"
    init-method="init" destroy-method="close" depends-on="xaJmsPooledConnectionFactory">
    <property name="uniqueResourceName">
        <value>XA-JMS-ATOMIKOS</value>
    </property>
    <property name="localTransactionMode">
        <value>false</value>
    </property>
    <property name="maxPoolSize">
        <value>32</value>
    </property>
    <property name="xaConnectionFactory">
        <ref bean="xaJmsPooledConnectionFactory" />
    </property>
</bean>
<bean id="xaJmsPooledConnectionFactory" class="org.apache.activemq.pool.XaPooledConnectionFactory"
     init-method="start" destroy-method="stop" depends-on="xaJmsConnectionFactory">
    <property name="maxConnections" value="2" />
    <property name="connectionFactory" ref="xaJmsConnectionFactory" />
</bean> 
<bean id="xaJmsConnectionFactory" class="org.apache.activemq.ActiveMQXAConnectionFactory">
    <property name="brokerURL"
        value="${queue.address}?jms.watchTopicAdvisories=false&amp;jms.prefetchPolicy.all=0" />
</bean>
java.lang.IllegalStateException: Cannot convert value of type [org.apache.activemq.pool.XaPooledConnectionFactory] to required type [javax.jms.XAConnectionFactory] for property 'xaConnectionFactory': no matching editors or conversion strategy found

The docs state that the XaPooledConnectionFactory class implements javax.jms.XAConnectionFactory, so I'm a bit lost at this point. It seems that should work.

Upvotes: 2

Views: 1310

Answers (1)

Petter Nordlander
Petter Nordlander

Reputation: 22279

I have seen this as well using the above combination.

It seems to be that the Spring DMLC (which is implementing the camel jms consumer) is using a loop of consumer.receive() to be able to enlist the read operation in a XA transaction.

Without XA caching in the DMLC or even usage of the SMLC which is not polling, is possible.

Try wrapping your activemq CF in a org.apache.activemq.pool.PooledConnectionFactory and see if that helps somewhat.

Upvotes: 1

Related Questions