Reputation: 575
There is a behavior in RabbitMQ server that it will not accept subsequent connections / operations when it reaches to watermark value till the time it rebalances itself. RabbitMQ client elegantly gets timeout when such situations happen after the connection timeout , But we are using Spring AMQP it continues to hang.
Steps to Reproduce
o Create a RabbitMQ HA cluster
o Create a simple program which produces and consumes message
o Make RabbitMQ server reach high watermark value in memory so that it cannot accept any new connections or perform any operations say for 10 min
o Create Q, Send message from
Spring Binaries Version
a) spring-rabbit-1.6.7.RELEASE.jar
b) spring-core-4.3.6.RELEASE.jar
c) spring-amqp-1.6.7.RELEASE.jar
We tried upgrading to Spring Rabbit and AMQP 2.0.2 version as well , But it didn’t helped.
Upvotes: 0
Views: 2014
Reputation: 174484
You don't describe what your "RabbitMQ Client" is, but the java amqp-client
uses classic Sockets by default. So you should get the same behavior with both (since Spring AMQP uses that client). Perhaps you are referring to some other language client.
With java Socket
s, when the connection is blocked, the thread is "stuck" in socket write which is not interruptible, nor does it timeout.
To handle this condition, you have to use the 4.0 client or above and use NIO.
Here is an example application that demonstrates the technique.
@SpringBootApplication
public class So48699178Application {
private static Logger logger = LoggerFactory.getLogger(So48699178Application.class);
public static void main(String[] args) {
SpringApplication.run(So48699178Application.class, args);
}
@Bean
public ApplicationRunner runner(RabbitTemplate template, CachingConnectionFactory ccf) {
ConnectionFactory cf = ccf.getRabbitConnectionFactory();
NioParams nioParams = new NioParams();
nioParams.setWriteEnqueuingTimeoutInMs(20_000);
cf.setNioParams(nioParams);
cf.useNio();
return args -> {
Message message = MessageBuilder.withBody(new byte[100_000])
.andProperties(MessagePropertiesBuilder.newInstance()
.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT)
.build())
.build();
while (true) {
try {
template.send("foo", message);
}
catch (Exception e) {
logger.info(e.getMessage());
}
}
};
}
@Bean
public Queue foo() {
return new Queue("foo");
}
}
and
2018-02-09 12:00:29.803 INFO 9430 --- [ main] com.example.So48699178Application : java.io.IOException: Frame enqueuing failed
2018-02-09 12:00:49.803 INFO 9430 --- [ main] com.example.So48699178Application : java.io.IOException: Frame enqueuing failed
2018-02-09 12:01:09.807 INFO 9430 --- [ main] com.example.So48699178Application : java.io.IOException: Frame enqueuing failed
Upvotes: 2