Jeremy Deane
Jeremy Deane

Reputation: 131

Spring AMQP Responding to ReplyTo Queue from MessageListener

If I am asynchronously listening to Spring AMQP messages how do I respond using the ReplyTo Queue and Correlation ID provided by the sender?

    @Override
public void onMessage(Message message) {

    byte[] bytes = message.getBody();

    String body = new String (bytes);

    logger.info(application + " processing message: \n" + body);

    //some business logic

    //now I want to respond to the replyto queue with the correlation ID
    //rabbitTemplate.????

}

Upvotes: 3

Views: 4180

Answers (3)

Jeremy Deane
Jeremy Deane

Reputation: 131

Based on the helpful feedback here is the equivalent solution using Spring Messaging:

public void onMessage(Message message) {

//mock response body
String body = "{ \"processing\": \"123456789\"}";

//mock response properties
MessageProperties properties = new MessageProperties();
properties.setContentType(MediaType.APPLICATION_JSON.toString());
properties.setContentEncoding(StandardCharsets.UTF_8.name());       

//return the Correlation ID if present
if (message.getMessageProperties().getCorrelationId() != null) {

    properties.setCorrelationId(message.getMessageProperties().getCorrelationId());
}

//create and return the response message
Message responseMessage = new Message(body.getBytes(), properties);

rabbitTemplate.send(message.getMessageProperties().getReplyTo(), responseMessage);

logger.info("Processed and returned message");

}

The confusing part for someone new to AMQP is knowing that the reply-to message property is the "Routing Key" value when replying (send or convertAndSend methods). And that temporary queues are created in the default exchange. This is not always the case with a permanent reply-to queue which would require a correlation-id (not necessary with temp reply-to queue).

Upvotes: 2

David Siro
David Siro

Reputation: 1906

You can simply call AmqpTemplate.send(routingKey, message).

As a routing key use the reply-to value. The template will publish to the default exchange and route the message to the queue specified as a routing key (which is your reply-to queue).

Also, you may want set the correlation id if the other side is doing request-reponse, so the response could be matched with request.

Upvotes: 0

Jeremy Deane
Jeremy Deane

Reputation: 131

One work-around I found was to extend ChannelAwareMessageListener and constructing the response as follows

@Override
public void onMessage(Message message, Channel channel) {

    String body = new String (message.getBody());

    String correlationId = new String (message.getMessageProperties().getCorrelationId());

    String replyTo = new String (message.getMessageProperties().getReplyTo());

    logger.info("Processing message with Correlation ID : " 
            + correlationId + " Replying To: " + replyTo + " and Message: \n"
            + body);

    BasicProperties basicProperties = new BasicProperties
            .Builder()
            .correlationId(correlationId)
            .contentType(MediaType.APPLICATION_JSON.getType())
            .contentEncoding(StandardCharsets.UTF_8.name())
            .build();

    String response = "{\"response\":" + body + "}";

    try {

        channel.basicPublish( "", replyTo, basicProperties, response.getBytes());

    } catch (IOException e) {

        logger.error("Failed to send message: \n" + e.getMessage());
    }

    logger.info("Processed Message");
}

This works fine but it would be great if I could respond similarly using the RabbitMQTemplate.

Upvotes: 0

Related Questions