Reputation: 4156
I am using SpringBoot to start a SpringAMQP application that connect to RabbitMQ queues. I would like to be able to send a message from the producer, specifying the reply-queue so that the consumer would only need to send without having to investigate the destination (hence not having to pass the reply data in the message itself).
this is the configuration I have (shared between producer and consumer)
private static final String QUEUE_NAME = "testQueue";
private static final String ROUTING_KEY = QUEUE_NAME;
public static final String REPLY_QUEUE = "replyQueue";
private static final String USERNAME = "guest";
private static final String PASSWORD = "guest";
private static final String IP = "localhost";
private static final String VHOST = "/";
private static final int PORT = 5672;
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
amqpAdmin().declareQueue(new Queue(QUEUE_NAME));
amqpAdmin().declareQueue(new Queue(REPLY_QUEUE));
return template;
}
@Bean
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory());
}
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(IP);
connectionFactory.setUsername(USERNAME);
connectionFactory.setPassword(PASSWORD);
connectionFactory.setVirtualHost(VHOST);
connectionFactory.setPort(PORT);
return connectionFactory;
}
I am sending a message as follows :
public Object sendAndReply(String queue, String content){
return template.convertSendAndReceive(queue, new Data(content), new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setReplyTo(ReplyTester.REPLY_QUEUE);
return message;
}
});
}
and awaiting a reply as follows:
public void replyToQueue(String queue){
template.receiveAndReply(queue, new ReceiveAndReplyCallback<Data, Data>() {
@Override
public Data handle(Data payload) {
System.out.println("Received: "+payload.toString());
return new Data("This is a reply for: "+payload.toString());
}
});
}
When sending however, I get the following exception:
Exception in thread "main" org.springframework.amqp.UncategorizedAmqpException: java.lang.IllegalArgumentException: Send-and-receive methods can only be used if the Message does not already have a replyTo property.
at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:66)
at org.springframework.amqp.rabbit.connection.RabbitAccessor.convertRabbitAccessException(RabbitAccessor.java:112)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:841)
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:820)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doSendAndReceiveWithTemporary(RabbitTemplate.java:705)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doSendAndReceive(RabbitTemplate.java:697)
at org.springframework.amqp.rabbit.core.RabbitTemplate.convertSendAndReceive(RabbitTemplate.java:673)
at org.springframework.amqp.rabbit.core.RabbitTemplate.convertSendAndReceive(RabbitTemplate.java:663)
at prodsend.Prod.sendAndReply(ReplyTester.java:137)
at prodsend.ReplyTester.sendMessages(ReplyTester.java:49)
at prodsend.ReplyTester.main(ReplyTester.java:102)
Caused by: java.lang.IllegalArgumentException: Send-and-receive methods can only be used if the Message does not already have a replyTo property.
at org.springframework.util.Assert.isNull(Assert.java:89)
at org.springframework.amqp.rabbit.core.RabbitTemplate$6.doInRabbit(RabbitTemplate.java:711)
at org.springframework.amqp.rabbit.core.RabbitTemplate$6.doInRabbit(RabbitTemplate.java:705)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:835)
... 8 more
the line ReplyTest.137 points to the return
line in the sendAndReply
method above.
EDIT: Here is the Data class that is mentioned above :)
class Data{
public String d;
public Data(String s){ d = s; }
public String toString() { return d; }
}
Upvotes: 2
Views: 4164
Reputation: 61148
From the documentation:
Basic RPC pattern. Send a message to a default exchange with a specific routing key and attempt to receive a response. Implementations will normally set the reply-to header to an exclusive queue and wait up for some time limited by a timeout.
So the method convertSendAndReceive
handles setting the replyTo
header and returns a Messaage
- the response. This is a synchronous pattern - RPC.
If you want to do this asynchronously - which you seem to - do not use this method. Use the appropriate convertAndSend
method and use the appropriate MessagePostProcessor
to add your replyTo
header.
As this is asynchronous, you need to register a separate handler for receiving the reply. This needs to be done before sending the message to the other party. This handler will then be called at some point after sending the message - when is unknown. Read section 3.5.2 Asynchronous Consumer of the Spring AQMP Documentation.
So, asynchronous process flow:
replyTo
queueuereplyTo
setreceiveAndReply
, processes the message, and sends a reply to the replyTo
The synchronous process flow is:
sendAndReceive
and blocksreceiveAndReply
, processes the message, and sends a reply to the replyTo
So the latter case requires the sender to wait. As you are using receiveXXX
rather than registering asynchronous handlers, the sender could be waiting a very long time if the client takes a while to get around to calling receiveXXX
.
Incidentally, if you want to use the synchronous approach but use a specific replyTo
you can always call setReplyQueue
. There is also a setReplyTimeout
for the case I mention where the client either doesn't bother to read the message or forgets to reply.
Upvotes: 6