nsdiv
nsdiv

Reputation: 983

Spring AMQP Consumer mysteriously dropping connection to queue

We're using spring-amqp 1.5.2, with RabbitMQ version 3.5.3. All queues work fine and we have consumers listening on them with no issues, except one consumer which keeps on dropping connections mysteriously. spring-amqp auto recovers, but after a few hours the consumers are disconnected and never come back up.

The queue is declared as

    @Bean()
public Queue analyzeTransactionsQueue(){
    Map<String, Object> args = new HashMap<>();
    args.put("x-message-ttl", 60000);
    return new Queue("analyze.txns", true, false, false, args);
}

Other queues are declared in a similar fashion, and have no issues.

The consumer (listener) is declared as

    @Bean
public SimpleRabbitListenerContainerFactory analyzeTransactionListenerContainerFactory(ConnectionFactory connectionFactory, AsyncTaskExecutor asyncTaskExecutor) {
connectionFactory.getVirtualHost());
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setConcurrentConsumers(2);
    factory.setMaxConcurrentConsumers(4);
    factory.setTaskExecutor(asyncTaskExecutor);
    ConsumerTagStrategy consumerTagStrategy = new ConsumerTagStrategy() {
        @Override
        public String createConsumerTag(String queue) {
            return queue;
        }
    };
    factory.setConsumerTagStrategy(consumerTagStrategy);
    return factory;
}

Again, other consumers having no issues are declared in a similar fashion.

The code after the message is received has no exceptions. Even after turning on DEBUG logging for SimpleMessageListenerContainer, there are no errors in the logs.

LogLevel=DEBUG; category=org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; msg=Cancelling Consumer: tags=[{}], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,47), acknowledgeMode=AUTO local queue size=0; 
LogLevel=DEBUG; category=org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; msg=Idle consumer terminating: Consumer: tags=[{}], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,47), acknowledgeMode=AUTO local queue size=0; 

Any ideas on why this would be happening. Have tried DEBUG logging but to no avail.

Upvotes: 1

Views: 4083

Answers (2)

Zahari
Zahari

Reputation: 391

Looking at the way you have configured things, it is pretty obvious that you have enabled dynamic scaling of consumers.

factory.setConcurrentConsumers(2);
factory.setMaxConcurrentConsumers(4);

There was a threading issue that I submitted a fix for which caused number of consumers to drop to zero. This was happening while consumers were scaling down.

By the looks of it, you have been a victim of that problem. The fix has been back-ported I believe and can be seen here

Try using the latest version and see whether you get the same problem.

Upvotes: 0

Vincent F
Vincent F

Reputation: 7341

one thing I have observed is that consumer would disconnect if there's an exception during parsing and it doesn't always log the problem, depending on your logging config...

since then, I always wrap the handleDelivery method into a try catch, to get better logging and no connection drop :

consumer = new DefaultConsumer(channel) {

        @Override
        public void handleDelivery(String consumerTag,
                                   Envelope envelope,
                                   AMQP.BasicProperties properties,
                                   byte[] body) throws IOException {

            log.info("processing message - content : " + new String(body, "UTF-8"));

           try {
                MyEvent myEvent = objectMapper.readValue(new String(body, "UTF-8"), MyEvent.class);
                processMyEvent(myEvent);

            } catch (Exception exp) {
                log.error("couldn't process "+MyEvent.class+" message : ", exp);
            }
        }
    }; 

Upvotes: 1

Related Questions