Reputation: 138
I recently changed from using a standard Rabbit Template, in my Spring Boot application, to using an Async Rabbit Template. In the process, I switched from the standard send
method to using the sendAndReceive
method.
Making this change does not seem to affect the publishing of messages to RabbitMQ, however I do now see stack traces as follows when sending messages:
org.springframework.amqp.core.AmqpReplyTimeoutException: Reply timed out
at org.springframework.amqp.rabbit.AsyncRabbitTemplate$RabbitFuture$TimeoutTask.run(AsyncRabbitTemplate.java:762) [spring-rabbit-2.3.10.jar!/:2.3.10]
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) [spring-context-5.3.9.jar!/:5.3.9]
I have tried modifying various settings including the reply and receive timeouts but all that changes is the time it takes to receive the above error. I have also tried setting useDirectReplyToContainer
to true
as well as setting useChannelForCorrelation
to true
.
I have managed to recreate the issue in a main method, included bellow, using a RabbitMQ broker running in docker.
public static void main(String[] args) {
com.rabbitmq.client.ConnectionFactory cf = new com.rabbitmq.client.ConnectionFactory();
cf.setHost("localhost");
cf.setPort(5672);
cf.setUsername("<my-username>");
cf.setPassword("<my-password>");
cf.setVirtualHost("<my-vhost>");
ConnectionFactory connectionFactory = new CachingConnectionFactory(cf);
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setExchange("primary");
rabbitTemplate.setUseDirectReplyToContainer(true);
rabbitTemplate.setReceiveTimeout(10000);
rabbitTemplate.setReplyTimeout(10000);
rabbitTemplate.setUseChannelForCorrelation(true);
AsyncRabbitTemplate asyncRabbitTemplate = new AsyncRabbitTemplate(rabbitTemplate);
asyncRabbitTemplate.start();
System.out.printf("Async Rabbit Template Running? %b\n", asyncRabbitTemplate.isRunning());
MessageBuilderSupport<MessageProperties> props = MessagePropertiesBuilder.newInstance()
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setMessageId(UUID.randomUUID().toString())
.setHeader(PUBLISH_TIME_HEADER, Instant.now(Clock.systemUTC()).toEpochMilli())
.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
asyncRabbitTemplate.sendAndReceive(
"1.1.1.csv-routing-key",
new Message(
"a,test,csv".getBytes(StandardCharsets.UTF_8),
props.build()
)
).addCallback(new ListenableFutureCallback<>() {
@Override
public void onFailure(Throwable ex) {
System.out.printf("Error sending message:\n%s\n", ex.getLocalizedMessage());
}
@Override
public void onSuccess(Message result) {
System.out.println("Message successfully sent");
}
});
}
I am sure that I am just missing a configuration option but any help would be appricated.
Thanks. :)
Upvotes: 1
Views: 2603
Reputation: 23
asyncRabbitTemplate.sendAndReceive(..)
will always expect a response from the consumer of the message, hence the timeout you are receiving.
To fire and forget use the standard RabbitTemplate.send(...)
and catching any exceptions in a try/catch block:
try {
rabbitTemplate.send("1.1.1.csv-routing-key",
new Message(
"a,test,csv".getBytes(StandardCharsets.UTF_8),
props.build());
} catch (AmqpException ex) {
log.error("failed to send rabbit message, routing key = {}", routingKey, ex);
}
Upvotes: 1
Reputation: 91
Set reply timeout to some bigger number and see the effect.
rabbitTemplate.setReplyTimeout(60000);
https://docs.spring.io/spring-amqp/reference/html/#reply-timeout
Upvotes: 0