Reputation: 131
If I am asynchronously listening to Spring AMQP messages how do I respond using the ReplyTo Queue and Correlation ID provided by the sender?
@Override
public void onMessage(Message message) {
byte[] bytes = message.getBody();
String body = new String (bytes);
logger.info(application + " processing message: \n" + body);
//some business logic
//now I want to respond to the replyto queue with the correlation ID
//rabbitTemplate.????
}
Upvotes: 3
Views: 4180
Reputation: 131
Based on the helpful feedback here is the equivalent solution using Spring Messaging:
public void onMessage(Message message) {
//mock response body
String body = "{ \"processing\": \"123456789\"}";
//mock response properties
MessageProperties properties = new MessageProperties();
properties.setContentType(MediaType.APPLICATION_JSON.toString());
properties.setContentEncoding(StandardCharsets.UTF_8.name());
//return the Correlation ID if present
if (message.getMessageProperties().getCorrelationId() != null) {
properties.setCorrelationId(message.getMessageProperties().getCorrelationId());
}
//create and return the response message
Message responseMessage = new Message(body.getBytes(), properties);
rabbitTemplate.send(message.getMessageProperties().getReplyTo(), responseMessage);
logger.info("Processed and returned message");
}
The confusing part for someone new to AMQP is knowing that the reply-to message property is the "Routing Key" value when replying (send or convertAndSend methods). And that temporary queues are created in the default exchange. This is not always the case with a permanent reply-to queue which would require a correlation-id (not necessary with temp reply-to queue).
Upvotes: 2
Reputation: 1906
You can simply call AmqpTemplate.send(routingKey, message).
As a routing key use the reply-to value. The template will publish to the default exchange and route the message to the queue specified as a routing key (which is your reply-to queue).
Also, you may want set the correlation id if the other side is doing request-reponse, so the response could be matched with request.
Upvotes: 0
Reputation: 131
One work-around I found was to extend ChannelAwareMessageListener and constructing the response as follows
@Override
public void onMessage(Message message, Channel channel) {
String body = new String (message.getBody());
String correlationId = new String (message.getMessageProperties().getCorrelationId());
String replyTo = new String (message.getMessageProperties().getReplyTo());
logger.info("Processing message with Correlation ID : "
+ correlationId + " Replying To: " + replyTo + " and Message: \n"
+ body);
BasicProperties basicProperties = new BasicProperties
.Builder()
.correlationId(correlationId)
.contentType(MediaType.APPLICATION_JSON.getType())
.contentEncoding(StandardCharsets.UTF_8.name())
.build();
String response = "{\"response\":" + body + "}";
try {
channel.basicPublish( "", replyTo, basicProperties, response.getBytes());
} catch (IOException e) {
logger.error("Failed to send message: \n" + e.getMessage());
}
logger.info("Processed Message");
}
This works fine but it would be great if I could respond similarly using the RabbitMQTemplate.
Upvotes: 0