Thomas Turner
Thomas Turner

Reputation: 138

Reply timeout when using AsyncRabbitTemplate::sendAndReceive - RabbitMQ

I recently changed from using a standard Rabbit Template, in my Spring Boot application, to using an Async Rabbit Template. In the process, I switched from the standard send method to using the sendAndReceive method.

Making this change does not seem to affect the publishing of messages to RabbitMQ, however I do now see stack traces as follows when sending messages:

org.springframework.amqp.core.AmqpReplyTimeoutException: Reply timed out
    at org.springframework.amqp.rabbit.AsyncRabbitTemplate$RabbitFuture$TimeoutTask.run(AsyncRabbitTemplate.java:762) [spring-rabbit-2.3.10.jar!/:2.3.10]
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) [spring-context-5.3.9.jar!/:5.3.9]

I have tried modifying various settings including the reply and receive timeouts but all that changes is the time it takes to receive the above error. I have also tried setting useDirectReplyToContainer to true as well as setting useChannelForCorrelation to true.
I have managed to recreate the issue in a main method, included bellow, using a RabbitMQ broker running in docker.

public static void main(String[] args) {
        com.rabbitmq.client.ConnectionFactory cf = new com.rabbitmq.client.ConnectionFactory();
        cf.setHost("localhost");
        cf.setPort(5672);
        cf.setUsername("<my-username>");
        cf.setPassword("<my-password>");
        cf.setVirtualHost("<my-vhost>");

        ConnectionFactory connectionFactory = new CachingConnectionFactory(cf);

        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setExchange("primary");
        rabbitTemplate.setUseDirectReplyToContainer(true);
        rabbitTemplate.setReceiveTimeout(10000);
        rabbitTemplate.setReplyTimeout(10000);
        rabbitTemplate.setUseChannelForCorrelation(true);

        AsyncRabbitTemplate asyncRabbitTemplate = new AsyncRabbitTemplate(rabbitTemplate);
        asyncRabbitTemplate.start();

        System.out.printf("Async Rabbit Template Running? %b\n", asyncRabbitTemplate.isRunning());

        MessageBuilderSupport<MessageProperties> props = MessagePropertiesBuilder.newInstance()
            .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
            .setMessageId(UUID.randomUUID().toString())
            .setHeader(PUBLISH_TIME_HEADER, Instant.now(Clock.systemUTC()).toEpochMilli())
            .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);

        asyncRabbitTemplate.sendAndReceive(
            "1.1.1.csv-routing-key",
            new Message(
                "a,test,csv".getBytes(StandardCharsets.UTF_8),
                props.build()
            )
        ).addCallback(new ListenableFutureCallback<>() {
            @Override
            public void onFailure(Throwable ex) {
                System.out.printf("Error sending message:\n%s\n", ex.getLocalizedMessage());
            }

            @Override
            public void onSuccess(Message result) {
                System.out.println("Message successfully sent");
            }
        });
    }

I am sure that I am just missing a configuration option but any help would be appricated.

Thanks. :)

Upvotes: 1

Views: 2603

Answers (2)

hawkeye-73
hawkeye-73

Reputation: 23

asyncRabbitTemplate.sendAndReceive(..) will always expect a response from the consumer of the message, hence the timeout you are receiving.

To fire and forget use the standard RabbitTemplate.send(...) and catching any exceptions in a try/catch block:

try {
        rabbitTemplate.send("1.1.1.csv-routing-key",
        new Message(
            "a,test,csv".getBytes(StandardCharsets.UTF_8),
            props.build());
    } catch (AmqpException ex) {
        log.error("failed to send rabbit message, routing key = {}", routingKey, ex);
    }

Upvotes: 1

Aditya
Aditya

Reputation: 91

Set reply timeout to some bigger number and see the effect.

    rabbitTemplate.setReplyTimeout(60000);

https://docs.spring.io/spring-amqp/reference/html/#reply-timeout

Upvotes: 0

Related Questions