Shuki
Shuki

Reputation: 310

Rabbitmq: Consumers for SAC queues are not connected after node in a cluster is restarted

Setup: 3 node cluster of rabbitmq nodes(via docker), behind ha-proxy.

Version:

Spring-boot(1.5.4) app with 3 queues.

  1. defined as "exclusive", durable, auto-delete is false
  2. defined as "SAC", durable, auto-delete is false
  3. classic, durable, auto-delete is false

Policy:

enter image description here

Scenario:

  1. When application starts at first, queues are registered correctly.
  2. I bring down any node at random, if it is master node, the mirroring is triggered and one of the mirrored node becomes master.All fine so far.
  3. When I bring that node up, it is when I start to get exceptions in the application logs:

Logs:

Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - failed to perform operation on queue 'single-active-consumer-message-queue' in vhost '/dev' due to timeout, class-id=50, method-id=10)
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66) ~[amqp-client-4.0.2.jar!/:4.0.2]
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:32) ~[amqp-client-4.0.2.jar!/:4.0.2]
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:366) ~[amqp-client-4.0.2.jar!/:4.0.2]
    at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:229) ~[amqp-client-4.0.2.jar!/:4.0.2]
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:117) ~[amqp-client-4.0.2.jar!/:4.0.2]
    ... 25 common frames omitted

It attempts to reconnect for 3 times then eventually prints the log below

2021-03-18 17:08:55.487 ERROR 1 --- [cTaskExecutor-4] o.s.a.r.l.SimpleMessageListenerContainer : Stopping container from aborted consumer
2021-03-18 17:08:55.487  INFO 1 --- [cTaskExecutor-4] o.s.a.r.l.SimpleMessageListenerContainer : Waiting for workers to finish.
2021-03-18 17:08:55.487  INFO 1 --- [cTaskExecutor-4] o.s.a.r.l.SimpleMessageListenerContainer : Successfully waited for workers to finish.

On the RabbitMQ Console i see this:

enter image description here

CachingConnectionFactory is defined with basic connection details of the ha-proxy

@Bean
    protected SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory, RetryOperationsInterceptor retryAdvice) {
        SimpleRabbitListenerContainerFactory containerFactory = new SimpleRabbitListenerContainerFactory();
        containerFactory.setConnectionFactory(connectionFactory);
        containerFactory.setDefaultRequeueRejected(false);
        containerFactory.setAdviceChain(retryAdvice);
        containerFactory.setMessageConverter(new Jackson2JsonMessageConverter());
        return containerFactory;
    }

Upvotes: 1

Views: 946

Answers (2)

Shuki
Shuki

Reputation: 310

This behavior was caused bu the bug in RabbitMQ. It seems to be fixed in version 3.8.17.

https://github.com/rabbitmq/rabbitmq-server/issues/3072

Upvotes: 2

Gary Russell
Gary Russell

Reputation: 174729

Boot 1.5.x and Spring AMQP 1.7.x are end of life and no longer supported.

That said, the following applies to 1.7.x too.

This situation will occur if queue recovery takes longer then 15 seconds (by default).

This is controlled by 2 container properties.

/**
 * Set the number of retries after passive queue declaration fails.
 * @param declarationRetries The number of retries, default 3.
 * @since 1.3.9
 * @see #setFailedDeclarationRetryInterval(long)
 */
public void setDeclarationRetries(int declarationRetries) {
    this.declarationRetries = declarationRetries;
}

/**
 * Set the interval between passive queue declaration attempts in milliseconds.
 * @param failedDeclarationRetryInterval the interval, default 5000.
 * @since 1.3.9
 */
public void setFailedDeclarationRetryInterval(long failedDeclarationRetryInterval) {
    this.failedDeclarationRetryInterval = failedDeclarationRetryInterval;
}

You can increase one or both of these to prevent the container from stopping under this condition.

Regarding the expected behavior of Single Active Consumer queues under this condition, I suggest you ask on the rabbitmq-users Google group.

Upvotes: 0

Related Questions