Reputation: 2071
MessageListenerContainer
with MessageListenerAdapter
implemented as
void handleMessage(MyRpcRequest request, MessageProperties messageProperties) {
var correlationData = new CorrelationData(messageProperties.getCorrelationId());
MessagePostProcessor messagePostProcessor = (m) -> {
m.getMessageProperties().setCorrelationId(correlationData.getId());
if (messagePostProcessorFactory != null) {
messagePostProcessorFactory.create(exchange, routingKey).postProcessMessage(m);
}
return m;
};
MyRpcResponse response = computeResponse(request);
rabbitTemplate.convertAndSend("", messageProperties.getReplyTo(), response, messagePostProcessor, correlationData);
}
I send back straight to default exchange - and it works fine - rpc response is received (more simple is just returning response, without sending it back with rabbit template, but I send it manually on purpose). But this way I cannot wiretap response messages. What I would like to do is sth like below - sending to RESPONSE_EXCHANGE_WHICH_IS_BOUND_TO_DEFAULT_EXCHANGE which is fanout exchange - I can wiretap messages from it, and then I would like it to forward messages to default exchange - I try to bind my response exchange to default exchange but it doesn't work
void handleMessage(MyRpcRequest request, MessageProperties messageProperties) {
....
rabbitTemplate.convertAndSend("RESPONSE_EXCHANGE_WHICH_IS_BOUND_TO_DEFAULT_EXCHANGE",
messageProperties.getReplyTo(), response, messagePostProcessor, correlationData);
....
}
@Bean
FanoutExchange defaultExchange() {
return new FanoutExchange("");
}
@Bean
FanoutExchange myRpcResponseExchange() {
return new FanoutExchange("RESPONSE_EXCHANGE_WHICH_IS_BOUND_TO_DEFAULT_EXCHANGE");
}
@Bean
Binding bindMyRpcResponseExchangeToDefaultExchange(FanoutExchange myRpcResponseExchange, FanoutExchange defaultExchange) {
return BindingBuilder.bind(myRpcResponseExchange).to(defaultExchange);
}
How can I fix my code so that I am able to wiretap response messages?
How can I wiretap response messages to requests that are send as:
rabbitTemplate.convertSendAndReceiveAsType(exchange, routingKey, message, messagePostProcessor, correlationData, responseType);
?
Upvotes: 0
Views: 136
Reputation: 174664
You are not allowed to manually bind anything to the default exchange.
See the management UI - there is no "bind" option.
Nor can you bind the default exchange to another.
You need to publish to the fanout and bind the reply queue to it.
Upvotes: 1