Reputation: 99
I have a synchronized Mule flow which reads messages from a sonic topic and publish to a Rabbit exchange.
I am loosing messages when the Rabbit is brought up/down. Rabbit exchange is publishing to HA queues. How can I make sure Mule is not consuming the message until proper "Ack" is received from Rabbit Broker? Here is the flow.
<jms:connector name="sonicMQConnectorSub" validateConnections="true" connectionFactory-ref="factorySub" doc:name="JMS" clientId="testClient" durable="true" maxRedelivery="-1" >
<reconnect-forever frequency="30000"/>
</jms:connector>
<spring:beans>
<spring:bean id="soniqMQConnectionFactoryBeanSub" name="factorySub" class="progress.message.jclient.ConnectionFactory">
<spring:property name="connectionURLs" value="tcp://server1:7800" />
<spring:property name="defaultUser" value="user" />
<spring:property name="defaultPassword" value="pass" />
</spring:bean>
</spring:beans>
<amqp:connector name="AMQP" validateConnections="true" host="server2" fallbackAddresses="server3" doc:name="AMQP Connector" port="5672" mandatory="true" activeDeclarationsOnly="true">
<reconnect-forever frequency="30000"/>
</amqp:connector>
<flow name="rabbitFlow1" doc:name="rabbitFlow1" processingStrategy="synchronous">
<jms:inbound-endpoint doc:name="JMS" connector-ref="sonicMQConnectorSub" topic="testtopic"/>
<logger message="Message: #[message.payload]" level="INFO" doc:name="Logger"/>
<amqp:outbound-endpoint exchangeName="rabbitExchange" exchangeDurable="true" responseTimeout="10000" connector-ref="AMQP" doc:name="AMQP" exchangeType="fanout"/>
</flow>
Updated 04/22
Here is the exception trace when Mule is connecting to the 2nd broker. This is when I loose a message.
2014-04-22 09:49:29,453 - org.mule.exception.DefaultSystemExceptionStrategy - ERROR -
********************************************************************************
Message : Connection shutdown detected for: AMQP
Code : MULE_ERROR--2
--------------------------------------------------------------------------------
Exception stack is:
1. Software caused connection abort: recv failed (java.net.SocketException)
java.net.SocketInputStream:-2 (null)
2. connection error; reason: java.net.SocketException: Software caused connection abort: recv failed (com.rabbitmq.client.ShutdownSignalException)
com.rabbitmq.client.impl.AMQConnection:715 (null)
3. Connection shutdown detected for: AMQP (org.mule.transport.ConnectException)
org.mule.transport.amqp.AmqpConnector$1:502 (http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/transport/ConnectException.html)
--------------------------------------------------------------------------------
Root Exception stack trace:
java.net.SocketException: Software caused connection abort: recv failed
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.read(SocketInputStream.java:150)
at java.net.SocketInputStream.read(SocketInputStream.java:121)
+ 3 more (set debug level logging or '-Dmule.verbose.exceptions=true' for everything)
********************************************************************************
2014-04-22 09:49:29,453 - org.mule.exception.DefaultSystemExceptionStrategy - INFO - Exception caught is a ConnectException, attempting to reconnect...
2014-04-22 09:49:29,454 - org.mule.lifecycle.AbstractLifecycleManager - INFO - Stopping connector: AMQP
2014-04-22 09:49:29,454 - org.mule.lifecycle.AbstractLifecycleManager - INFO - Stopping: 'AMQP.dispatcher.1064499250'. Object is: AmqpMessageDispatcher
2014-04-22 09:49:29,454 - org.mule.lifecycle.AbstractLifecycleManager - INFO - Disposing: 'AMQP.dispatcher.1064499250'. Object is: AmqpMessageDispatcher
2014-04-22 09:49:29,455 - org.mule.transport.amqp.AmqpConnector - ERROR - clean connection shutdown; reason: Attempt to use closed connection
2014-04-22 09:49:29,461 - org.mule.transport.amqp.AmqpConnector - INFO - Connected: AmqpConnector
{
name=AMQP
lifecycle=stop
this=33c5919e
numberOfConcurrentTransactedReceivers=4
createMultipleTransactedReceivers=true
connected=true
supportedProtocols=[amqp]
serviceOverrides=<none>
}
2014-04-22 09:49:29,461 - org.mule.transport.amqp.AmqpConnector - INFO - Starting: AmqpConnector
{
name=AMQP
lifecycle=stop
this=33c5919e
numberOfConcurrentTransactedReceivers=4
createMultipleTransactedReceivers=true
connected=true
supportedProtocols=[amqp]
serviceOverrides=<none>
}
2014-04-22 09:49:29,461 - org.mule.lifecycle.AbstractLifecycleManager - INFO - Starting connector: AMQP
Updated 04/23 with the Exception received when JMS Transaction is added to AMQP outbound Endpoint:
Message : No active AMQP transaction found for endpoint: DefaultOutboundEndpoint{endpointUri=amqp://rabbitExchange, connector=AmqpConnector
{
name=AMQP
lifecycle=start
this=25ec1ff7
numberOfConcurrentTransactedReceivers=4
createMultipleTransactedReceivers=true
connected=true
supportedProtocols=[amqp]
serviceOverrides=<none>
}
, name='endpoint.amqp.rabbitExchange', mep=ONE_WAY, properties={exchangeDurable=true, exchangeType=fanout}, transactionConfig=Transaction {factory=org.mule.transport.jms.JmsTransactionFactory@6491b172, action=ALWAYS_JOIN, timeout=30000}, deleteUnacceptedMessages=false, initialState=started, responseTimeout=10000, endpointEncoding=UTF-8, disableTransportTransformer=false}
Code : MULE_ERROR--2
--------------------------------------------------------------------------------
Root Exception stack trace:
org.mule.transaction.IllegalTransactionStateException: No active AMQP transaction found for endpoint: DefaultOutboundEndpoint{endpointUri=amqp://rabbitExchange, connector=AmqpConnector
{
name=AMQP
lifecycle=start
this=25ec1ff7
numberOfConcurrentTransactedReceivers=4
createMultipleTransactedReceivers=true
connected=true
supportedProtocols=[amqp]
serviceOverrides=<none>
}
, name='endpoint.amqp.rabbitExchange', mep=ONE_WAY, properties= {exchangeDurable=true, exchangeType=fanout}, transactionConfig=Transaction {factory=org.mule.transport.jms.JmsTransactionFactory@6491b172, action=ALWAYS_JOIN, timeout=30000}, deleteUnacceptedMessages=false, initialState=started, responseTimeout=10000, endpointEncoding=UTF-8, disableTransportTransformer=false}
at org.mule.transport.amqp.AmqpMessageDispatcher.getEventChannel(AmqpMessageDispatcher.java:298)
at org.mule.transport.amqp.AmqpMessageDispatcher.doOutboundAction(AmqpMessageDispatcher.java:152)
at org.mule.transport.amqp.AmqpMessageDispatcher.doDispatch(AmqpMessageDispatcher.java:127)
+ 3 more (set debug level logging or '-Dmule.verbose.exceptions=true' for everything)
********************************************************************************
2014-04-23 10:52:03,178 - org.mule.transport.jms.JmsTransaction - WARN - Transaction rollback attempted, but no resource bound to org.mule.transport.jms.JmsTransaction@d4ac3d8f-caf6-11e3-bf9a-8b266a026dee [status=STATUS_MARKED_ROLLBACK, key=null, resource=null]
Upvotes: 1
Views: 1009
Reputation: 33413
I see two options:
testtopic
transactionally so if amqp:outbound-endpoint
fails, the message will be redelivered.amqp:outbound-endpoint
with until-successful
to retry the outbound dispatches until the AMQP connector reconnects to RabbitMQ.Upvotes: 1