Reputation: 339
Our team has been having some issues with Spring AMQP using cloud foundry connectors. Whenever there is some instability on network, the connection fails, then AMQP seems to try to auto-recover, fails to auto-recover, then creates a new consumer.
The problem is that the RabbitMQ doesn't seem to unregister the old consumers... so we end up having 40 consumers instead of just usual one consumer. And, I think, that because of the amount of invalid consumers, the old consumers receive some messages, but won't run our code. So we have a bunch of unacked messages until application is restarted and clear consumer list.
I'm not sure if this is a misconfiguration from our end, or a bug. Any ideas?
Additional information below.
A fraction of our logs:
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842 com.rabbitmq.client.TopologyRecoveryException: Caught an exception while recovering consumer amq.ctag-Dqik8vz9dSPiUzMbM4uGbw: connection is already closed due to connection error; cause: java.net.SocketException: Connection reset
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842 at java.lang.Thread.run(Thread.java:748) [na:1.8.0_162]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842 2019-03-04 00:31:47.823 ERROR [Atlas Backend,,,] 23 --- [0.32.27.77:5672] c.r.c.impl.ForgivingExceptionHandler : Caught an exception when recovering topology Caught an exception while recovering consumer amq.ctag-t7_vY47weZNYtWagl__pYA: connection is already closed due to connection error; cause: java.net.SocketException: Connection reset
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842 at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:198) ~[amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842 2019-03-04 00:31:47.823 ERROR [Atlas Backend,,,] 23 --- [0.32.27.77:5672] c.r.c.impl.ForgivingExceptionHandler : Caught an exception when recovering topology Caught an exception while recovering consumer amq.ctag-AK1YOpN0_E2KVwSi4fzOaw: connection is already closed due to connection error; cause: java.net.SocketException: Connection reset
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842 at com.rabbitmq.client.impl.AMQConnection.notifyRecoveryCanBeginListeners(AMQConnection.java:693) [amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842 at com.rabbitmq.client.impl.AMQConnection.doFinalShutdown(AMQConnection.java:687) [amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842 at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:577) [amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842 at java.lang.Thread.run(Thread.java:748) [na:1.8.0_162]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842 2019-03-04 00:31:47.822 ERROR [Atlas Backend,,,] 23 --- [0.32.27.77:5672] c.r.c.impl.ForgivingExceptionHandler : Caught an exception when recovering topology Caught an exception while recovering consumer amq.ctag-vrWhRdhk3ejrXo2-ImjXzA: connection is already closed due to connection error; cause: java.net.SocketException: Connection reset
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842 at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:244) ~[amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842 at java.lang.Thread.run(Thread.java:748) [na:1.8.0_162]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842 at java.lang.Thread.run(Thread.java:748) [na:1.8.0_162]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842 at com.rabbitmq.client.impl.recovery.RecordedConsumer.recover(RecordedConsumer.java:60) ~[amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842 ... 7 common frames omitted
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842 at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.recoverConsumers(AutorecoveringConnection.java:657) [amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842 at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.access$000(AutorecoveringConnection.java:53) [amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842 at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.recoverConsumers(AutorecoveringConnection.java:657) [amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842 at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:577) [amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842 at com.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:1242) ~[amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842 at com.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:1242) ~[amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842 at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.recoverConsumers(AutorecoveringConnection.java:657) [amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842 at com.rabbitmq.client.impl.AMQConnection.doFinalShutdown(AMQConnection.java:687) [amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842 at com.rabbitmq.client.impl.AMQConnection.notifyRecoveryCanBeginListeners(AMQConnection.java:693) [amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842 at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:198) ~[amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842 at com.rabbitmq.client.impl.recovery.RecordedConsumer.recover(RecordedConsumer.java:60) ~[amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842 at com.rabbitmq.client.impl.recovery.RecordedConsumer.recover(RecordedConsumer.java:60) ~[amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842 at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.recoverConsumers(AutorecoveringConnection.java:673) [amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842 at com.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:1242) ~[amqp-client-4.0.3.jar!/:4.0.3]
a0b279ba-fb73-41cc-bdf9-b430cec52856 1 Mon Mar 04 2019 02:31:47.842 at java.lang.Thread.run(Thread.java:748) [na:1.8.0_162]
We register our consumers using @RabbitListener annotation
@RabbitListener(id = CONSUMER_ID, queues = "${CommonInternalEndPoint}", containerFactory = "rabbitLocalContainerFactory")
protected void process(Object messageObject) {
As we do connect to multiple rabbit mq instances, we need to declare the connection factory manually. Our connection factory comes from cloud foundry connectors:
@Configuration
@Profile("cloud")
public class RabbitCloudConfiguration {
@Bean
public Cloud cloud(){
return new CloudFactory().getCloud();
}
@Bean
@Primary
public ConnectionFactory connectionFactory(){
return cloud().getSingletonServiceConnector(ConnectionFactory.class, null);
}
@Bean("rabbitLocalContainerFactory")
public SimpleRabbitListenerContainerFactory rabbitLocalContainerFactory(){
SimpleRabbitListenerContainerFactory containerFactory = new SimpleRabbitListenerContainerFactory();
containerFactory.setConnectionFactory(connectionFactory());
containerFactory.setAutoStartup(true);
return containerFactory;
}
}
Upvotes: 0
Views: 749
Reputation: 121272
We are using Spring Boot 1.5.9, Framework 4.3.6 , AMQP 1.7.4, RabbitMQ AMQP Client 4.0.3
Starting with version 2.0, Spring AMQP disables an automaticRecoveryEnabled
on the target RabbitMQ ConnectionFactory
: https://docs.spring.io/spring-amqp/docs/2.1.4.RELEASE/reference/#auto-recovery
Since the version you use is based on the RabbitMQ ConnectionFactory
provided by the Cloud Connectors, you need to disable it explicitly:
@Bean
@Primary
public ConnectionFactory connectionFactory(){
ConnectionFactory connectionFactory = cloud().getSingletonServiceConnector(ConnectionFactory.class, null);
((CachingConnectionFactory) connectionFactory).getRabbitConnectionFactory().setAutomaticRecoveryEnabled(false);
return connectionFactory;
}
And your solution is going to be based on the recovery mechanism provided by the Spring AMQP.
Upvotes: 1