Reputation: 21
I'm wondering if anyone has run into this issue when using the combination of Atomikos + Camel + ActiveMQ Classic. I am using this combo to peel off messages from a queue in a transacted fashion. It seems to work well.
The problem is that I am now in a situation where I need to turn on advisory messages in ActiveMQ. After I did that, I noticed that all the queues are constantly having connections recreated. This is evidenced by the flooding of the ActiveMQ.Advisory.Consumer.Queue
topics. It is also apparent in the DEBUG logging, as it continually creates a connection, opens a transaction, commits it, and closes the connection. This happens without any actual application generated messages. All the other non-transacted queues/topics do not have this issue. I have read in a few other posts that connection pooling and caching can alleviate this issue. It appears that I shouldn't use caching and that I'm already connection pooling. I am using this config:
<bean id="txq" class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="configuration" ref="txJmsConfig" />
</bean>
<bean id="txJmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
<property name="connectionFactory" ref="atomikosConnectionFactory" />
<property name="concurrentConsumers" value="1" />
<property name="transacted" value="true" />
<property name="maxConcurrentConsumers" value="${consumers.concurrent.max}" />
<property name="transactionManager" ref="jtaTransactionManager" />
<property name="cacheLevelName" value="CACHE_NONE" />
</bean>
<bean id="atomikosConnectionFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean"
init-method="init" destroy-method="close">
<property name="uniqueResourceName">
<value>XA-JMS-ATOMIKOS</value>
</property>
<property name="localTransactionMode">
<value>false</value>
</property>
<property name="poolSize">
<value>4</value>
</property>
<property name="xaConnectionFactory">
<ref bean="xaJmsConnectionFactory" />
</property>
</bean>
<bean id="xaJmsConnectionFactory" class="org.apache.activemq.ActiveMQXAConnectionFactory">
<property name="brokerURL"
value="${queue.address}?jms.watchTopicAdvisories=false&jms.prefetchPolicy.all=0" />
</bean>
<bean id="userTransactionService" class="com.atomikos.icatch.config.UserTransactionServiceImp"
init-method="init" destroy-method="shutdownForce">
<constructor-arg>
<props>
<prop key="com.atomikos.icatch.service">
com.atomikos.icatch.standalone.UserTransactionServiceFactory
</prop>
<prop key="com.atomikos.icatch.max_actives">${batch.transactions.concurrent.max}</prop>
</props>
</constructor-arg>
</bean>
<bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager"
init-method="init" destroy-method="close" depends-on="userTransactionService">
<property name="forceShutdown" value="false" />
</bean>
<bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp">
<property name="transactionTimeout" value="300" />
</bean>
<bean id="jtaTransactionManager"
class="org.springframework.transaction.jta.JtaTransactionManager">
<property name="transactionManager" ref="atomikosTransactionManager" />
<property name="userTransaction" ref="atomikosUserTransaction" />
</bean>
It uses the AtomikosConnectionFactoryBean
which I thought implemented pooling. Maybe I'm wrong on that? I'd love to hear if anyone else is in this boat with me, and what they did to fix it.
@PeterSmith suggested implementation
Petter, thanks for your suggestion. I changed my config to use the XaPooledConnectionFactory
. Spring is not pleased with this. It thinks that XaPooledConnectionFactory
does not implement XAConnectionFactory
.
<bean id="atomikosConnectionFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean"
init-method="init" destroy-method="close" depends-on="xaJmsPooledConnectionFactory">
<property name="uniqueResourceName">
<value>XA-JMS-ATOMIKOS</value>
</property>
<property name="localTransactionMode">
<value>false</value>
</property>
<property name="maxPoolSize">
<value>32</value>
</property>
<property name="xaConnectionFactory">
<ref bean="xaJmsPooledConnectionFactory" />
</property>
</bean>
<bean id="xaJmsPooledConnectionFactory" class="org.apache.activemq.pool.XaPooledConnectionFactory"
init-method="start" destroy-method="stop" depends-on="xaJmsConnectionFactory">
<property name="maxConnections" value="2" />
<property name="connectionFactory" ref="xaJmsConnectionFactory" />
</bean>
<bean id="xaJmsConnectionFactory" class="org.apache.activemq.ActiveMQXAConnectionFactory">
<property name="brokerURL"
value="${queue.address}?jms.watchTopicAdvisories=false&jms.prefetchPolicy.all=0" />
</bean>
java.lang.IllegalStateException: Cannot convert value of type [org.apache.activemq.pool.XaPooledConnectionFactory] to required type [javax.jms.XAConnectionFactory] for property 'xaConnectionFactory': no matching editors or conversion strategy found
The docs state that the XaPooledConnectionFactory
class implements javax.jms.XAConnectionFactory
, so I'm a bit lost at this point. It seems that should work.
Upvotes: 2
Views: 1310
Reputation: 22279
I have seen this as well using the above combination.
It seems to be that the Spring DMLC (which is implementing the camel jms consumer) is using a loop of consumer.receive() to be able to enlist the read operation in a XA transaction.
Without XA caching in the DMLC or even usage of the SMLC which is not polling, is possible.
Try wrapping your activemq CF in a org.apache.activemq.pool.PooledConnectionFactory and see if that helps somewhat.
Upvotes: 1