mangusbrother
mangusbrother

Reputation: 4156

Reply-To in SpringAMQP being set beforehand?

I am using SpringBoot to start a SpringAMQP application that connect to RabbitMQ queues. I would like to be able to send a message from the producer, specifying the reply-queue so that the consumer would only need to send without having to investigate the destination (hence not having to pass the reply data in the message itself).

this is the configuration I have (shared between producer and consumer)

private static final String QUEUE_NAME = "testQueue";
private static final String ROUTING_KEY = QUEUE_NAME;
public static final String REPLY_QUEUE = "replyQueue";
private static final String USERNAME = "guest";
private static final String PASSWORD = "guest";
private static final String IP = "localhost";
private static final String VHOST = "/";
private static final int PORT = 5672;

@Bean
public RabbitTemplate rabbitTemplate() {
    RabbitTemplate template = new RabbitTemplate(connectionFactory());
    amqpAdmin().declareQueue(new Queue(QUEUE_NAME));
    amqpAdmin().declareQueue(new Queue(REPLY_QUEUE));
    return template;
}

@Bean
public AmqpAdmin amqpAdmin() {
    return new RabbitAdmin(connectionFactory());
}

@Bean
public ConnectionFactory connectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory(IP);
    connectionFactory.setUsername(USERNAME);
    connectionFactory.setPassword(PASSWORD);
    connectionFactory.setVirtualHost(VHOST);
    connectionFactory.setPort(PORT);
    return connectionFactory;
}

I am sending a message as follows :

public Object sendAndReply(String queue, String content){
        return template.convertSendAndReceive(queue, new Data(content), new MessagePostProcessor() {

            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setReplyTo(ReplyTester.REPLY_QUEUE);
                return message;
            }
        });
    }

and awaiting a reply as follows:

public void replyToQueue(String queue){
    template.receiveAndReply(queue, new ReceiveAndReplyCallback<Data, Data>() {
        @Override
        public Data handle(Data payload) {
            System.out.println("Received: "+payload.toString());
            return new Data("This is a reply for: "+payload.toString());
        }
    });
}

When sending however, I get the following exception:

Exception in thread "main" org.springframework.amqp.UncategorizedAmqpException: java.lang.IllegalArgumentException: Send-and-receive methods can only be used if the Message does not already have a replyTo property.
    at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:66)
    at org.springframework.amqp.rabbit.connection.RabbitAccessor.convertRabbitAccessException(RabbitAccessor.java:112)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:841)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:820)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.doSendAndReceiveWithTemporary(RabbitTemplate.java:705)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.doSendAndReceive(RabbitTemplate.java:697)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.convertSendAndReceive(RabbitTemplate.java:673)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.convertSendAndReceive(RabbitTemplate.java:663)
    at prodsend.Prod.sendAndReply(ReplyTester.java:137)
    at prodsend.ReplyTester.sendMessages(ReplyTester.java:49)
    at prodsend.ReplyTester.main(ReplyTester.java:102)
Caused by: java.lang.IllegalArgumentException: Send-and-receive methods can only be used if the Message does not already have a replyTo property.
    at org.springframework.util.Assert.isNull(Assert.java:89)
    at org.springframework.amqp.rabbit.core.RabbitTemplate$6.doInRabbit(RabbitTemplate.java:711)
    at org.springframework.amqp.rabbit.core.RabbitTemplate$6.doInRabbit(RabbitTemplate.java:705)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:835)
    ... 8 more

the line ReplyTest.137 points to the return line in the sendAndReply method above.


EDIT: Here is the Data class that is mentioned above :)

class Data{
    public String d;
    public Data(String s){ d = s; }
    public String toString() { return d; }
}

Upvotes: 2

Views: 4164

Answers (1)

Boris the Spider
Boris the Spider

Reputation: 61148

From the documentation:

Basic RPC pattern. Send a message to a default exchange with a specific routing key and attempt to receive a response. Implementations will normally set the reply-to header to an exclusive queue and wait up for some time limited by a timeout.

So the method convertSendAndReceive handles setting the replyTo header and returns a Messaage - the response. This is a synchronous pattern - RPC.

If you want to do this asynchronously - which you seem to - do not use this method. Use the appropriate convertAndSend method and use the appropriate MessagePostProcessor to add your replyTo header.

As this is asynchronous, you need to register a separate handler for receiving the reply. This needs to be done before sending the message to the other party. This handler will then be called at some point after sending the message - when is unknown. Read section 3.5.2 Asynchronous Consumer of the Spring AQMP Documentation.

So, asynchronous process flow:

  1. sender registers a handler on replyTo queueue
  2. sender sends message with replyTo set
  3. client calls receiveAndReply, processes the message, and sends a reply to the replyTo
  4. sender callback method is triggered

The synchronous process flow is:

  1. sender sends message using sendAndReceive and blocks
  2. client calls receiveAndReply, processes the message, and sends a reply to the replyTo
  3. sender receives the reply, wakes and processes it

So the latter case requires the sender to wait. As you are using receiveXXX rather than registering asynchronous handlers, the sender could be waiting a very long time if the client takes a while to get around to calling receiveXXX.

Incidentally, if you want to use the synchronous approach but use a specific replyTo you can always call setReplyQueue. There is also a setReplyTimeout for the case I mention where the client either doesn't bother to read the message or forgets to reply.

Upvotes: 6

Related Questions