Akshat
Akshat

Reputation: 575

Spring AMQP client hangs

There is a behavior in RabbitMQ server that it will not accept subsequent connections / operations when it reaches to watermark value till the time it rebalances itself. RabbitMQ client elegantly gets timeout when such situations happen after the connection timeout , But we are using Spring AMQP it continues to hang.

Steps to Reproduce

o Create a RabbitMQ HA cluster

o Create a simple program which produces and consumes message

o Make RabbitMQ server reach high watermark value in memory so that it cannot accept any new connections or perform any operations say for 10 min

o Create Q, Send message from

Spring Binaries Version

a) spring-rabbit-1.6.7.RELEASE.jar

b) spring-core-4.3.6.RELEASE.jar

c) spring-amqp-1.6.7.RELEASE.jar

We tried upgrading to Spring Rabbit and AMQP 2.0.2 version as well , But it didn’t helped.

Upvotes: 0

Views: 2014

Answers (1)

Gary Russell
Gary Russell

Reputation: 174484

You don't describe what your "RabbitMQ Client" is, but the java amqp-client uses classic Sockets by default. So you should get the same behavior with both (since Spring AMQP uses that client). Perhaps you are referring to some other language client.

With java Sockets, when the connection is blocked, the thread is "stuck" in socket write which is not interruptible, nor does it timeout.

To handle this condition, you have to use the 4.0 client or above and use NIO.

Here is an example application that demonstrates the technique.

@SpringBootApplication
public class So48699178Application {

    private static Logger logger = LoggerFactory.getLogger(So48699178Application.class);

    public static void main(String[] args) {
        SpringApplication.run(So48699178Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(RabbitTemplate template, CachingConnectionFactory ccf) {
        ConnectionFactory cf = ccf.getRabbitConnectionFactory();
        NioParams nioParams = new NioParams();
        nioParams.setWriteEnqueuingTimeoutInMs(20_000);
        cf.setNioParams(nioParams);
        cf.useNio();
        return args -> {
            Message message = MessageBuilder.withBody(new byte[100_000])
                    .andProperties(MessagePropertiesBuilder.newInstance()
                            .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT)
                            .build())
                    .build();
            while (true) {
                try {
                    template.send("foo", message);
                }
                catch (Exception e) {
                    logger.info(e.getMessage());
                }
            }

        };

    }

    @Bean
    public Queue foo() {
        return new Queue("foo");
    }

}

and

2018-02-09 12:00:29.803 INFO 9430 --- [ main] com.example.So48699178Application : java.io.IOException: Frame enqueuing failed

2018-02-09 12:00:49.803 INFO 9430 --- [ main] com.example.So48699178Application : java.io.IOException: Frame enqueuing failed

2018-02-09 12:01:09.807 INFO 9430 --- [ main] com.example.So48699178Application : java.io.IOException: Frame enqueuing failed

Upvotes: 2

Related Questions