Reputation: 409
I implemented a consumer, which will reconnect to broker automatically after a while if underlying connection is closed. My case is as below:
Stop RabbitMQ server, consumer will show an exception:
com.rabbitmq.client.ShutdownSignalException: connection error; reason: {#method(reply-code=541, reply-text=INTERNAL_ERROR, class-id=0, method-id=0), null, ""}.
And then consumer will sleep 60 seconds before reconnect.
In this case, all messages published before reconnect will be lost. Also I performed another experiment.
Note:The QOS of consumer is 1. I have researched RabbitMQ several days, in my understanding, consumer should get the message published before reconnect. Pls help(I ran test based on windows rabbitMQ).
Below is the PUBLISHER:
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(this.getHost());
connection = factory.newConnection();
Channel channel = connection.createChannel();
channel = conn.createChannel();
// declare a 'topic' type of exchange
channel.exchangeDeclare(exchangeName, "topic");
// Content-type "application/octet-stream", deliveryMode 2
// (persistent), priority zero
channel.basicPublish(exchangeName, routingKey, MessageProperties.PERSISTENT_BASIC, message);
connection.close();
And the CONSUMER is as below:
@Override
public void consume(final String exchangeName, final String queueName, final String routingKey,
final int qos) throws IOException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(this.getHost());
while (true) {
Connection connection = null;
try {
connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(exchangeName, "topic");
// declare a durable, non-exclusive, non-autodelete queue.
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
// distribute workload among all consumers, consumer will
// pre-fetch
// {qos}
// messages to local buffer.
channel.basicQos(qos);
logger.debug(" [*] Waiting for messages. To exit press CTRL+C");
QueueingConsumer consumer = new QueueingConsumer(channel);
// disable auto-ack. If enable auto-ack, RabbitMQ delivers a
// message to
// the customer it immediately removes it from memory.
boolean autoAck = false;
channel.basicConsume(queueName, autoAck, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
try {
RabbitMessageConsumer.this.consumeMessage(delivery);
}
catch (Exception e) {
// the exception shouldn't affect the next message
logger.info("[IGNORE]" + e.getMessage());
}
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
catch (Exception e) {
logger.warn(e);
}
if (autoReconnect) {
this.releaseConn(connection);
logger.info("[*] Will try to reconnect to remote host(" + this.getHost() + ") in "
+ this.reconnectInterval / 1000 + " seconds.");
Thread.sleep(this.getReconnectInterval());
}
else
break;
}
}
private void releaseConn(Connection conn) {
try {
if (conn != null)
conn.close();
}
catch (Exception e) {
// simply ignore this exception
}
}
As it is a 'topic' exchange, no queue is declared at PUBLISHER. However at step#3 of 1st test, the durable queue has been declared, and the message is durable as well. I don't understand why message will be lost before reconnect.
Upvotes: 4
Views: 11307
Reputation: 467
Instead of channel.queueDeclare(QUEUE_NAME, true, false, false, null);
Use this channel.queueDeclare(QUEUE_NAME, false, false, false, null);
Upvotes: 3
Reputation: 4961
This happens to my RabbitMQ cluster after I upgrade from 3.1 to 3.3. And my solution is to remove the /var/lib/rabbitmq/mnesia
dir.
Upvotes: 0
Reputation: 7624
If there is no Queue at time of publish the message will be lost. Are you connecting to the same queue, if so is it durable, or do you recreate it before publishing the message after restarting the RMQ server? Sounds like the solutions is one of:
Additionally make sure you are reconnecting to the correct queue in your consumer. 1) is probably the better of the two solutions.
Upvotes: 6
Reputation: 409
Oh, I found the cause...The message and queue are certainly durable, however the exchange isn't durable. As exchange isn't durable, the binding information between queue and exchange will be lost between RabbitMQ broker restart.
Now I declare the exchange as durable, consumer can get message which published before consumer restart and after broker restart.
Upvotes: 8