Reputation: 1130
I would like to use RabbitTemplate#sendAndReceive to send a message to a queue and receive multiple messages, not just one. I wanted to extend this behavior by using my own MessageListenerContainer
, but RabbitTemplate
seems to instantiate DirectReplyToMessageListenerContainer
directly.
Currently, an exception is thrown when 2 messages with the same correlation-id arrive at amq.rabbitmq.reply-to
:
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1569) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1480) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1400) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1387) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1366) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.callExecuteListener(DirectMessageListenerContainer.java:928) [spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.handleDelivery(DirectMessageListenerContainer.java:917) [spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149) [amqp-client-5.4.3.jar:5.4.3]
at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:104) [amqp-client-5.4.3.jar:5.4.3]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_152]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_152]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_152]
Caused by: org.springframework.amqp.AmqpRejectAndDontRequeueException: Reply received after timeout
at org.springframework.amqp.rabbit.core.RabbitTemplate.onMessage(RabbitTemplate.java:2270) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.amqp.rabbit.listener.DirectReplyToMessageListenerContainer.lambda$setMessageListener$1(DirectReplyToMessageListenerContainer.java:114) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1477) ~[spring-rabbit-2.1.0.RELEASE.jar:2.1.0.RELEASE]
... 10 common frames omitted
Is there a different way (if any) to receive multiple responses when using sendAndReceive
methods?
Upvotes: 3
Views: 701
Reputation: 174574
It is not designed to do that; you would need to use a RabbitTemplate.send()
operation and a stand-alone listener container and you would correlate the replies in your code.
EDIT
Here's one way to achieve it (as long as you know how many replies to expect)...
@SpringBootApplication
public class So53206036Application {
public static void main(String[] args) {
SpringApplication.run(So53206036Application.class, args);
}
@Bean
public MultiReplyTemplate rabbitTemplate(ConnectionFactory cf) {
MultiReplyTemplate template = new MultiReplyTemplate();
template.setConnectionFactory(cf);
template.setMessageConverter(listConverter());
return template;
}
@Bean
public ListConverter listConverter() {
return new ListConverter(new SimpleMessageConverter());
}
@RabbitListener(queues = "foo")
public String listen1(String in) {
return in.toUpperCase();
}
@RabbitListener(queues = "bar")
public String listen2(String in) {
return in + in;
}
@Bean
public ApplicationRunner runner(MultiReplyTemplate template) {
return args -> {
List<String> reply = template.convertSendAndReceiveAsType("fanout", "", "foo",
new ParameterizedTypeReference<List<String>>() { });
System.out.println(reply);
};
}
}
class MultiReplyTemplate extends RabbitTemplate {
private static final byte[] NOBODY = new byte[0];
private final Map<String, Message> replies = new HashMap<>();
@Override
public void onMessage(Message message) {
// Not thread-safe but that's ok since the DRTMLC is single-threadded.
String corr = message.getMessageProperties().getCorrelationId();
Message combined = this.replies.get(corr);
if (combined == null) {
combined = new Message(NOBODY, new MessageProperties());
combined.getMessageProperties().getHeaders().put("replies", new ArrayList<Message>());
this.replies.put(corr, combined);
}
@SuppressWarnings("unchecked")
List<Message> list = (List<Message>) combined.getMessageProperties().getHeaders().get("replies");
list.add(message);
if (list.size() == 2) {
this.replies.remove(corr);
combined.getMessageProperties().setCorrelationId(corr);
super.onMessage(combined);
}
}
}
class ListConverter implements SmartMessageConverter {
private final MessageConverter delegate;
ListConverter(MessageConverter delegate) {
this.delegate = delegate;
}
@Override
public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
return this.delegate.toMessage(object, messageProperties);
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
return this.delegate.fromMessage(message); // for listeners
}
@Override
public Object fromMessage(Message message, Object conversionHint) throws MessageConversionException {
@SuppressWarnings({ "unchecked" })
List<Message> list = (List<Message>) message.getMessageProperties().getHeaders().get("replies");
return list.stream()
.map(m -> this.delegate.fromMessage(m))
.collect(Collectors.toList());
}
}
and
[FOO, foofoo]
Upvotes: 2