Reputation: 355
I realized, even after explicitly setting the property
<property name="cacheLevelName" value="CACHE_CONSUMER"/>
my consumer is still removed.
See log below:
23:14:25,180 | Usage | 49 - org.apache.activemq.activemq-core - 5.5.0.fuse-00-43 | Main:memory:queue://reply:memory: usage change from: 0% of available memory, to: 1% of available memory
23:14:25,180 | Queue | 49 - org.apache.activemq.activemq-core - 5.5.0.fuse-00-43 | reply toPageIn: 1, Inflight: 0, pagedInMessages.size 2, enqueueCount: 2, dequeueCount: 1
23:14:25,180 | Queue | 49 - org.apache.activemq.activemq-core - 5.5.0.fuse-00-43 | reply toPageIn: 0, Inflight: 0, pagedInMessages.size 3, enqueueCount: 2, dequeueCount: 1
23:14:25,180 | Queue | 49 - org.apache.activemq.activemq-core - 5.5.0.fuse-00-43 | reply toPageIn: 0, Inflight: 0, pagedInMessages.size 3, enqueueCount: 2, dequeueCount: 1
23:14:25,649 | ActiveMQMessageConsumer | 49 - org.apache.activemq.activemq-core - 5.5.0.fuse-00-43 | remove: ID:TestESB-3364-1321307776420-8:1:1:84, lastDeliveredSequenceId:0
23:14:25,649 | AbstractRegion | 49 - org.apache.activemq.activemq-core - 5.5.0.fuse-00-43 | default removing consumer: ID:TestESB-3364-1321307776420-8:1:1:84 for destination: queue://routerResponse
23:14:25,649 | Queue | 49 - org.apache.activemq.activemq-core - 5.5.0.fuse-00-43 | queue://routerResponse remove sub: QueueSubscription: consumer=ID:TestESB-3364-1321307776420-8:1:1:84, destinations=1, dispatched=0, delivered=0, pending=0, lastDeliveredSeqId: 0, dequeues: 0, dispatched: 0, inflight: 0
23:14:25,649 | AbstractRegion | 49 - org.apache.activemq.activemq-core - 5.5.0.fuse-00-43 | default adding consumer: ID:TestESB-3364-1321307776420-8:1:1:85 for destination: queue://routerResponse
23:14:25,649 | Queue | 49 - org.apache.activemq.activemq-core - 5.5.0.fuse-00-43 | queue://routerResponse add sub: QueueSubscription: consumer=ID:TestESB-3364-1321307776420-8:1:1:85, destinations=0, dispatched=0, delivered=0, pending=0, dequeues: 0, dispatched: 0, inflight: 0
23:14:26,165 | ActiveMQMessageConsumer | 49 - org.apache.activemq.activemq-core - 5.5.0.fuse-00-43 | remove: ID:TestESB-3364-1321307776420-14:1:1:4, lastDeliveredSequenceId:0
23:14:26,165 | AbstractRegion | 49 - org.apache.activemq.activemq-core - 5.5.0.fuse-00-43 | default removing consumer: ID:TestESB-3364-1321307776420-14:1:1:4 for destination: queue://reply
23:14:26,165 | Queue | 49 - org.apache.activemq.activemq-core - 5.5.0.fuse-00-43 | queue://reply remove sub: QueueSubscription: consumer=ID:TestESB-3364-1321307776420-14:1:1:4, destinations=1, dispatched=0, delivered=0, pending=0, lastDeliveredSeqId: 0, dequeues: 1, dispatched: 1, inflight: 0
23:14:26,165 | Queue | 49 - org.apache.activemq.activemq-core - 5.5.0.fuse-00-43 | reply toPageIn: 0, Inflight: 0, pagedInMessages.size 3, enqueueCount: 2, dequeueCount: 1
23:14:26,165 | AbstractRegion | 49 - org.apache.activemq.activemq-core - 5.5.0.fuse-00-43 | default adding consumer: ID:TestESB-3364-1321307776420-14:1:1:5 for destination: queue://reply
23:14:26,165 | Queue | 49 - org.apache.activemq.activemq-core - 5.5.0.fuse-00-43 | queue://reply add sub: QueueSubscription: consumer=ID:TestESB-3364-1321307776420-14:1:1:5, destinations=0, dispatched=0, delivered=0, pending=0, dequeues: 1, dispatched: 1, inflight: 0
23:14:26,165 | Queue | 49 - org.apache.activemq.activemq-core - 5.5.0.fuse-00-43 | reply toPageIn: 0, Inflight: 0, pagedInMessages.size 3, enqueueCount: 2, dequeueCount: 1
23:14:26,649 | ActiveMQMessageConsumer | 49 - org.apache.activemq.activemq-core - 5.5.0.fuse-00-43 | remove: ID:TestESB-3364-1321307776420-8:1:1:85, lastDeliveredSequenceId:0
See my camel spring config below:
<bean id="providerConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<!--<property name="brokerURL"><value>${aero-provider-broker-url}</value></property>-->
<bean id="providerActiveMQConfig" class="org.apache.activemq.camel.component.ActiveMQConfiguration">
<property name="connectionFactory" ref="providerConnectionFactory"/>
<property name="cacheLevelName" value="CACHE_CONSUMER"/>
<property name="concurrentConsumers" value="${jms-concurrent-consumers}"/>
<property name="requestTimeout" value="${jms-request-timeout}"/>
<property name="priority" value="${jms-message-priority}"/>
</bean>
<bean id="providerActivemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="configuration" ref="providerActiveMQConfig"/>
<property name="brokerURL"><value>${provider-broker-url}</value></property>
</bean>
Edit here >>>>
BEan definition:
<bean id="localJMSConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="alwaysSessionAsync" value="false"/>
<property name="alwaysSyncSend" value="true"/>
<property name="brokerURL"><value>${local-broker-url}</value></property>
<property name="closeTimeout" value="150000"/>
<property name="copyMessageOnSend" value="true"/>
<property name="disableTimeStampsByDefault" value="false"/>
<property name="dispatchAsync" value="false"/>
<property name="objectMessageSerializationDefered" value="false"/>
<property name="optimizeAcknowledge" value="false"/>
<property name="optimizedMessageDispatch" value="true"/>
<property name="producerWindowSize" value="0"/>
<property name="statsEnabled" value="false"/>
<property name="useAsyncSend" value="false"/>
<property name="useCompression" value="false"/>
<property name="sendTimeout" value="0"/>
</bean>
<bean id="localJMSTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager">
<property name="connectionFactory" ref="localJMSConnectionFactory" />
</bean>
<bean id="localRequestJMSConfig" class="org.apache.camel.component.jms.JmsConfiguration">
<property name="connectionFactory" ref="localJMSConnectionFactory"/>
<property name="concurrentConsumers" value="${jms-concurrent-consumers}"/>
<property name="deliveryPersistent" value="true"/>
<property name="priority" value="9"/>
</bean>
<bean id="localRequestJMS" class="org.apache.camel.component.jms.JmsComponent">
<property name="configuration" ref="localRequestJMSConfig"/>
</bean>
Camel Enpoint Definition:
<camel:endpoint id="jmsEndpoint" camelContextId="bohProcessorCC" uri="localRequestJMS:queue:${boh-processor-queue}"/>
<camel:endpoint id="providerEndpoint" camelContextId="bohProcessorCC" uri="providerActivemq:queue:${generic-http-provider-queue}?exchangePattern=InOut&requestTimeout=${jms-request-timeout}&replyTo=reply"/>
Camel Route:
<from ref="jmsEndpoint"/>
<bean ref="service" method="getSignInRequest"/>
<transform><simple>XML=${in.body}</simple></transform>
<inOut ref="providerEndpoint"/>
<bean ref="service" method="getSignInResponseData"/>
<bean ref="utilityServicesBean" method="process"/>
<bean ref="service" method="getPassword"/>
<inOut ref="providerEndpoint"/>
<bean ref="camelPropertyEnricherBean" method="addCustReference"/>
<bean ref="camelPropertyEnricherBean" method="normalizeXML"/>
<inOut ref="providerEndpoint"/>
On the second call, the exchange times out, usually saying timeout. Typically there is response on the reply queue but the log says correlation id doesnt match. I noticed that if I ask camel to set the message id as correlation id just before calling the provider endpoint, it usually returns with a message. Even that fails sometimes when there are high traffic.
Upvotes: 2
Views: 2159
Reputation: 55540
You use request/reply over JMS with Camel, eg yo do an inOut ref="providerEndpoint", which also uses a fixed replyTo queue name, as you specify replyTo=reply.
That means Camel forces to uses CACHE_SESSION as the reply queue is considered a shared reply queue, and Camel uses a JMS Message selector to pickup the expected reply message based on correlation IDs. And for the JMS message selector to be updated, you need to re-create the consumer. And that is why you see the a consumer is removed in the logs. Its that reply consumer.
In Camel 2.9 onwards we introduced a replyToType option, which you can set to replyToType=Exclusive. And then the reply queue is considered exclusive, which means Camel uses CACHE_CONSUMER instead.
Upvotes: 2