JohnJ
JohnJ

Reputation: 4833

How to tolerate RabbitMQ restarts in Langohr?

We have Clojure code which reads from a Rabbit queue. We would like to tolerate the case where the RabbitMQ server is down briefly, e.g. in the case of a restart (sudo service rabbitmq-server restart).

There appears to be some provision for reconnecting in Langohr. We adapted the example clojurewerkz.langohr.examples.recovery.example1 (Gist here). Slight differences vs. the published example include the connection parameters, and the removal of the lb/publish call (since we're filling the data with an external source).

We can successfully consume data from the queue and wait for more messages. However, when we restart RMQ (via the above sudo command on the VM hosting RabbitMQ), the following exception is thrown:

Caught an exception during connection recovery!
java.io.IOException
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
    at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:378)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:516)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:545)
    at com.novemberain.langohr.Connection.recoverConnection(Connection.java:166)
    at com.novemberain.langohr.Connection.beginAutomaticRecovery(Connection.java:115)
    at com.novemberain.langohr.Connection.access$000(Connection.java:18)
    at com.novemberain.langohr.Connection$1.shutdownCompleted(Connection.java:93)
    at com.rabbitmq.client.impl.ShutdownNotifierComponent.notifyListeners(ShutdownNotifierComponent.java:75)
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:573)
Caused by: com.rabbitmq.client.ShutdownSignalException: connection error; reason: java.io.EOFException
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
    at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:321)
    ... 8 more
Caused by: java.io.EOFException
    at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:273)
    at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:95)
    at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:131)
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:533)

It seems likely that the intended restart mechanism provided by Langohr is breaking when it kicks in. Is there an alternative pattern which is preferred in the case of these "hard" restarts? Alternatively, I suppose we have to implement connection monitoring and retries ourselves. Any suggestions would be most welcome.

Upvotes: 5

Views: 461

Answers (1)

Brian Marick
Brian Marick

Reputation: 1440

We used to see such stack traces, but we no longer see them with Langohr 2.9.0. After a restart, our clojure clients reconnect and messages start flowing again.

We are using the defaults, which have connection and topology coverage turned on, as shown by this:

(infof "Automatic recovery enabled? %s" (rmq/automatic-recovery-enabled? connection))
(infof "Topology recovery enabled? %s" (rmq/automatic-topology-recovery-enabled? connection))

Upvotes: 2

Related Questions