bastiat
bastiat

Reputation: 2071

How to wiretap rabbitmq rpc response message

MessageListenerContainer with MessageListenerAdapter implemented as

void handleMessage(MyRpcRequest request, MessageProperties messageProperties) {
        var correlationData = new CorrelationData(messageProperties.getCorrelationId());
        MessagePostProcessor messagePostProcessor = (m) -> {
            m.getMessageProperties().setCorrelationId(correlationData.getId());
            if (messagePostProcessorFactory != null) {
                messagePostProcessorFactory.create(exchange, routingKey).postProcessMessage(m);
            }
            return m;
        };
        MyRpcResponse response = computeResponse(request);
    rabbitTemplate.convertAndSend("", messageProperties.getReplyTo(), response, messagePostProcessor, correlationData);
}

I send back straight to default exchange - and it works fine - rpc response is received (more simple is just returning response, without sending it back with rabbit template, but I send it manually on purpose). But this way I cannot wiretap response messages. What I would like to do is sth like below - sending to RESPONSE_EXCHANGE_WHICH_IS_BOUND_TO_DEFAULT_EXCHANGE which is fanout exchange - I can wiretap messages from it, and then I would like it to forward messages to default exchange - I try to bind my response exchange to default exchange but it doesn't work

void handleMessage(MyRpcRequest request, MessageProperties messageProperties) {
    ....
    rabbitTemplate.convertAndSend("RESPONSE_EXCHANGE_WHICH_IS_BOUND_TO_DEFAULT_EXCHANGE",
        messageProperties.getReplyTo(), response, messagePostProcessor, correlationData); 
    ....
}

    @Bean
    FanoutExchange defaultExchange() {
        return new FanoutExchange("");
    }

    @Bean
    FanoutExchange myRpcResponseExchange() {
        return new FanoutExchange("RESPONSE_EXCHANGE_WHICH_IS_BOUND_TO_DEFAULT_EXCHANGE");
    }

    @Bean
    Binding bindMyRpcResponseExchangeToDefaultExchange(FanoutExchange myRpcResponseExchange, FanoutExchange defaultExchange) {
        return BindingBuilder.bind(myRpcResponseExchange).to(defaultExchange);
    }

How can I fix my code so that I am able to wiretap response messages?

How can I wiretap response messages to requests that are send as:

rabbitTemplate.convertSendAndReceiveAsType(exchange, routingKey, message, messagePostProcessor, correlationData, responseType); ?

Upvotes: 0

Views: 136

Answers (1)

Gary Russell
Gary Russell

Reputation: 174664

You are not allowed to manually bind anything to the default exchange.

See the management UI - there is no "bind" option.

Nor can you bind the default exchange to another.

You need to publish to the fanout and bind the reply queue to it.

Upvotes: 1

Related Questions