Reputation: 983
We're using spring-amqp 1.5.2, with RabbitMQ version 3.5.3. All queues work fine and we have consumers listening on them with no issues, except one consumer which keeps on dropping connections mysteriously. spring-amqp auto recovers, but after a few hours the consumers are disconnected and never come back up.
The queue is declared as
@Bean()
public Queue analyzeTransactionsQueue(){
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 60000);
return new Queue("analyze.txns", true, false, false, args);
}
Other queues are declared in a similar fashion, and have no issues.
The consumer (listener) is declared as
@Bean
public SimpleRabbitListenerContainerFactory analyzeTransactionListenerContainerFactory(ConnectionFactory connectionFactory, AsyncTaskExecutor asyncTaskExecutor) {
connectionFactory.getVirtualHost());
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setConcurrentConsumers(2);
factory.setMaxConcurrentConsumers(4);
factory.setTaskExecutor(asyncTaskExecutor);
ConsumerTagStrategy consumerTagStrategy = new ConsumerTagStrategy() {
@Override
public String createConsumerTag(String queue) {
return queue;
}
};
factory.setConsumerTagStrategy(consumerTagStrategy);
return factory;
}
Again, other consumers having no issues are declared in a similar fashion.
The code after the message is received has no exceptions. Even after turning on DEBUG logging for SimpleMessageListenerContainer, there are no errors in the logs.
LogLevel=DEBUG; category=org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; msg=Cancelling Consumer: tags=[{}], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,47), acknowledgeMode=AUTO local queue size=0;
LogLevel=DEBUG; category=org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; msg=Idle consumer terminating: Consumer: tags=[{}], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,47), acknowledgeMode=AUTO local queue size=0;
Any ideas on why this would be happening. Have tried DEBUG logging but to no avail.
Upvotes: 1
Views: 4083
Reputation: 391
Looking at the way you have configured things, it is pretty obvious that you have enabled dynamic scaling of consumers.
factory.setConcurrentConsumers(2); factory.setMaxConcurrentConsumers(4);
There was a threading issue that I submitted a fix for which caused number of consumers to drop to zero. This was happening while consumers were scaling down.
By the looks of it, you have been a victim of that problem. The fix has been back-ported I believe and can be seen here
Try using the latest version and see whether you get the same problem.
Upvotes: 0
Reputation: 7341
one thing I have observed is that consumer would disconnect if there's an exception during parsing and it doesn't always log the problem, depending on your logging config...
since then, I always wrap the handleDelivery method into a try catch, to get better logging and no connection drop :
consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
log.info("processing message - content : " + new String(body, "UTF-8"));
try {
MyEvent myEvent = objectMapper.readValue(new String(body, "UTF-8"), MyEvent.class);
processMyEvent(myEvent);
} catch (Exception exp) {
log.error("couldn't process "+MyEvent.class+" message : ", exp);
}
}
};
Upvotes: 1