How to not lose messages when publishing to RabbitMQ from Mule until the proper "Ack" is received?

I have a synchronized Mule flow which reads messages from a sonic topic and publish to a Rabbit exchange.

I am loosing messages when the Rabbit is brought up/down. Rabbit exchange is publishing to HA queues. How can I make sure Mule is not consuming the message until proper "Ack" is received from Rabbit Broker? Here is the flow.

<jms:connector name="sonicMQConnectorSub" validateConnections="true" connectionFactory-ref="factorySub" doc:name="JMS" clientId="testClient" durable="true" maxRedelivery="-1" >
     <reconnect-forever frequency="30000"/>
</jms:connector>
<spring:beans>
    <spring:bean id="soniqMQConnectionFactoryBeanSub" name="factorySub" class="progress.message.jclient.ConnectionFactory">
        <spring:property name="connectionURLs" value="tcp://server1:7800" />
        <spring:property name="defaultUser" value="user" />
        <spring:property name="defaultPassword" value="pass" />
    </spring:bean>
</spring:beans>

<amqp:connector name="AMQP" validateConnections="true" host="server2" fallbackAddresses="server3" doc:name="AMQP Connector" port="5672" mandatory="true" activeDeclarationsOnly="true">
    <reconnect-forever frequency="30000"/>
</amqp:connector>


<flow name="rabbitFlow1" doc:name="rabbitFlow1" processingStrategy="synchronous">
    <jms:inbound-endpoint doc:name="JMS" connector-ref="sonicMQConnectorSub" topic="testtopic"/>

    <logger message="Message: #[message.payload]" level="INFO" doc:name="Logger"/>

    <amqp:outbound-endpoint exchangeName="rabbitExchange" exchangeDurable="true" responseTimeout="10000" connector-ref="AMQP" doc:name="AMQP" exchangeType="fanout"/>
 </flow>

Updated 04/22

Here is the exception trace when Mule is connecting to the 2nd broker. This is when I loose a message.

2014-04-22 09:49:29,453 - org.mule.exception.DefaultSystemExceptionStrategy - ERROR -
********************************************************************************
Message               : Connection shutdown detected for: AMQP
Code                  : MULE_ERROR--2
--------------------------------------------------------------------------------
Exception stack is:
1. Software caused connection abort: recv failed (java.net.SocketException)
java.net.SocketInputStream:-2 (null)
2. connection error; reason: java.net.SocketException: Software caused connection     abort: recv failed (com.rabbitmq.client.ShutdownSignalException)
com.rabbitmq.client.impl.AMQConnection:715 (null)
3. Connection shutdown detected for: AMQP (org.mule.transport.ConnectException)
org.mule.transport.amqp.AmqpConnector$1:502     (http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/transport/ConnectException.html)
--------------------------------------------------------------------------------
Root Exception stack trace:
java.net.SocketException: Software caused connection abort: recv failed
    at java.net.SocketInputStream.socketRead0(Native Method)
    at java.net.SocketInputStream.read(SocketInputStream.java:150)
    at java.net.SocketInputStream.read(SocketInputStream.java:121)
+ 3 more (set debug level logging or '-Dmule.verbose.exceptions=true' for everything)
********************************************************************************

2014-04-22 09:49:29,453 - org.mule.exception.DefaultSystemExceptionStrategy - INFO -    Exception caught is a ConnectException, attempting to reconnect...
2014-04-22 09:49:29,454 - org.mule.lifecycle.AbstractLifecycleManager - INFO - Stopping   connector: AMQP

2014-04-22 09:49:29,454 - org.mule.lifecycle.AbstractLifecycleManager - INFO -   Stopping: 'AMQP.dispatcher.1064499250'. Object is: AmqpMessageDispatcher
2014-04-22 09:49:29,454 - org.mule.lifecycle.AbstractLifecycleManager - INFO -  Disposing: 'AMQP.dispatcher.1064499250'. Object is: AmqpMessageDispatcher
2014-04-22 09:49:29,455 - org.mule.transport.amqp.AmqpConnector - ERROR - clean  connection shutdown; reason: Attempt to use closed connection
2014-04-22 09:49:29,461 - org.mule.transport.amqp.AmqpConnector - INFO - Connected:    AmqpConnector
{
name=AMQP
lifecycle=stop
this=33c5919e
numberOfConcurrentTransactedReceivers=4
createMultipleTransactedReceivers=true
connected=true
supportedProtocols=[amqp]
serviceOverrides=<none>
}

2014-04-22 09:49:29,461 - org.mule.transport.amqp.AmqpConnector - INFO - Starting:   AmqpConnector
{
name=AMQP
lifecycle=stop
this=33c5919e
numberOfConcurrentTransactedReceivers=4
createMultipleTransactedReceivers=true
connected=true
supportedProtocols=[amqp]
serviceOverrides=<none>
}

2014-04-22 09:49:29,461 - org.mule.lifecycle.AbstractLifecycleManager - INFO - Starting  connector: AMQP

Updated 04/23 with the Exception received when JMS Transaction is added to AMQP outbound Endpoint:

Message               : No active AMQP transaction found for endpoint:      DefaultOutboundEndpoint{endpointUri=amqp://rabbitExchange, connector=AmqpConnector
{
name=AMQP
lifecycle=start
this=25ec1ff7
numberOfConcurrentTransactedReceivers=4
createMultipleTransactedReceivers=true
connected=true
supportedProtocols=[amqp]
serviceOverrides=<none>
}
,  name='endpoint.amqp.rabbitExchange', mep=ONE_WAY, properties={exchangeDurable=true,     exchangeType=fanout}, transactionConfig=Transaction   {factory=org.mule.transport.jms.JmsTransactionFactory@6491b172, action=ALWAYS_JOIN,   timeout=30000}, deleteUnacceptedMessages=false, initialState=started, responseTimeout=10000, endpointEncoding=UTF-8, disableTransportTransformer=false}
Code                  : MULE_ERROR--2

--------------------------------------------------------------------------------
Root Exception stack trace:
org.mule.transaction.IllegalTransactionStateException: No active AMQP transaction  found for endpoint: DefaultOutboundEndpoint{endpointUri=amqp://rabbitExchange,   connector=AmqpConnector
{
name=AMQP
lifecycle=start
this=25ec1ff7
numberOfConcurrentTransactedReceivers=4
createMultipleTransactedReceivers=true
connected=true
supportedProtocols=[amqp]
serviceOverrides=<none>
}
,  name='endpoint.amqp.rabbitExchange', mep=ONE_WAY, properties= {exchangeDurable=true, exchangeType=fanout}, transactionConfig=Transaction {factory=org.mule.transport.jms.JmsTransactionFactory@6491b172, action=ALWAYS_JOIN,  timeout=30000}, deleteUnacceptedMessages=false, initialState=started,   responseTimeout=10000, endpointEncoding=UTF-8, disableTransportTransformer=false}
at org.mule.transport.amqp.AmqpMessageDispatcher.getEventChannel(AmqpMessageDispatcher.java:298)
at org.mule.transport.amqp.AmqpMessageDispatcher.doOutboundAction(AmqpMessageDispatcher.java:152)
at org.mule.transport.amqp.AmqpMessageDispatcher.doDispatch(AmqpMessageDispatcher.java:127)
+ 3 more (set debug level logging or '-Dmule.verbose.exceptions=true' for everything)
********************************************************************************

2014-04-23 10:52:03,178 - org.mule.transport.jms.JmsTransaction - WARN - Transaction   rollback attempted, but no resource bound to   org.mule.transport.jms.JmsTransaction@d4ac3d8f-caf6-11e3-bf9a-8b266a026dee  [status=STATUS_MARKED_ROLLBACK, key=null, resource=null]

Upvotes: 1

Views: 1009

Answers (1)

David Dossot
David Dossot

Reputation: 33413

I see two options:

  • Make the JMS client a durable one and consume testtopic transactionally so if amqp:outbound-endpoint fails, the message will be redelivered.
  • Wrap the amqp:outbound-endpoint with until-successful to retry the outbound dispatches until the AMQP connector reconnects to RabbitMQ.

Upvotes: 1

Related Questions