Patryk Maryn
Patryk Maryn

Reputation: 95

Problem with RabbitMQ Direct reply-to with Spring

I'm working on an apllication that sends message to as server, then given message is modified and sent back to the amq.rabbitmq.reply-to queue using Direct Reply-to . I've followed the the tutorial https://www.rabbitmq.com/direct-reply-to.html but I have some problems to implement it. In my case as I've understood I need to consume message from pseudo-queue amq.rabbitmq.reply-to in no-ack mode, Which in my case is MessageListenerContainer. Here's my config:

@Bean
    public Jackson2JsonMessageConverter messageConverter() {    
        ObjectMapper mapper = new ObjectMapper();
        return new Jackson2JsonMessageConverter(mapper);
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        return new RabbitAdmin(connectionFactory);
    }

    @Bean
    public RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(messageConverter());
        rabbitTemplate.setReplyAddress("amq.rabbitmq.reply-to");
        return rabbitTemplate;
    }

    @Bean
    MessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory ) {       
        DirectMessageListenerContainer directMessageListenerContainer = new DirectMessageListenerContainer();
        directMessageListenerContainer.setConnectionFactory(connectionFactory);
        directMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.NONE);
        directMessageListenerContainer.setQueueNames("amq.rabbitmq.reply-to");
        directMessageListenerContainer.setMessageListener(new PracticalMessageListener());
        return directMessageListenerContainer;

    }

Message is sent as JSON through the SEND frame on STOM protocol and converted. Then a new queue
is created dynamically and added to the MessageListenerContainer. So when the message arrives in the broker, I would like to modify it on the server side and send back to amq.rabbitmq.reply-to and original message to be sent to routing key messageTemp.getTo() which is subscribed on the SUBSCRIBE frame in STOMP.

  @MessageMapping("/private")
  public void send2(MessageTemplate messageTemp) throws Exception {
      MessageTemplate privateMessage = new MessageTemplate(messageTemp.getPerson(),
              messageTemp.getMessage(), 
              messageTemp.getTo());

     AbstractMessageListenerContainer abstractMessageListenerContainer =
              (AbstractMessageListenerContainer) mlc;

       // here's the queue added to listener container   
      abstractMessageListenerContainer.addQueueNames(messageTemp.getTo());

      MessageProperties mp = new MessageProperties();
      mp.setReplyTo("amq.rabbitmq.reply-to");
      mp.setCorrelationId("someId");

      Jackson2JsonMessageConverter smc = new Jackson2JsonMessageConverter();
      Message message = smc.toMessage(messageTemp, mp);


      rabbitTemplate.sendAndReceive( 
              messageTemp.getTo() , message);
  }

Message is modified onMessage method when message sent to messageTemp.getTo() routing key

@Component
public class PracticalMessageListener implements MessageListener {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Override
    public void onMessage(Message message) {
        System.out.println(("message listener.."));
        String body = "{ \"processing\": \"123456789\"}";
       MessageProperties properties = new MessageProperties();

       // some business logic on the message body

        properties.setCorrelationId(message.getMessageProperties().getCorrelationId());
        Message responseMessage = new Message(body.getBytes(), properties);

        rabbitTemplate.convertAndSend("", 
                message.getMessageProperties().getReplyTo(), responseMessage);
    }

I may misunderstand the concept of direct-reply and the documentation that says:

Consume from the pseudo-queue amq.rabbitmq.reply-to in no-ack mode. There is no need to declare this "queue" first, although the client can do so if it wants.

The question is where I need to consume from that queue? And how Can I access that modified message if I'm getting error:

2020-01-15 22:17:09.688  WARN 96222 --- [pool-1-thread-5] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.

org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener threw exception
Caused by: java.lang.NullPointerException: null
    at com.patrykmaryn.spring.second.PracticalMessageListener.onMessage(PracticalMessageListener.java:50) ~[classes/:na]

Which is coming from the place when I invoke rabbitTemplate.convertAndSend in PracticalMessageListener

EDIT

I got rid of setting amq.rabbitmq.reply-to in the DirectMessageListenerContainer and implemented DirectReplyToMessageListenerContainer:

@Bean
    DirectReplyToMessageListenerContainer drtmlc (ConnectionFactory connectionFactory) {
        DirectReplyToMessageListenerContainer drtmlc =
                new DirectReplyToMessageListenerContainer(connectionFactory);
        drtmlc.setConnectionFactory(connectionFactory);
        drtmlc.setAcknowledgeMode(AcknowledgeMode.NONE);
        drtmlc.setMessageListener(new DirectMessageListener());
        return drtmlc;
    }

The problem must be in onMessage method that doesn't allow to invoke any send method on rabbitTemplate, I've tried with different existing routing keys and exchanges. The listening is coming from queue defined with routing key messageTemp.getTo().

@Override
    public void onMessage(Message message) {
        System.out.println(("message listener.."));

        String receivedRoutingKey = message.getMessageProperties()
           .getReceivedRoutingKey();
        System.out.println(" This is received routingkey: " + 
            receivedRoutingKey);

           /// ..... rest of code goes here

        rabbitTemplate.convertAndSend("", 
                message.getMessageProperties().getReplyTo(), 
                responseMessage);

Where messageTemp.getTo() is routing key defined at runtime, by selecting a receiver e.g if i select 'user1' it will print out 'user1'.

That's the first attempt to send message:

2020-01-16 02:22:20.213 DEBUG 28490 --- [nboundChannel-6] .WebSocketAnnotationMethodMessageHandler : Searching methods to handle SEND /app/private session=45yca5sy, lookupDestination='/private'
2020-01-16 02:22:20.214 DEBUG 28490 --- [nboundChannel-6] .WebSocketAnnotationMethodMessageHandler : Invoking PracticalTipSender#send2[1 args]
2020-01-16 02:22:20.239  INFO 28490 --- [nboundChannel-6] o.s.a.r.l.DirectMessageListenerContainer : SimpleConsumer [queue=user1, consumerTag=amq.ctag-Evyiweew4C-K1mXmy2XqUQ identity=57b19488] started
2020-01-16 02:22:20.268  INFO 28490 --- [nboundChannel-6] o.s.s.c.ThreadPoolTaskScheduler          : Initializing ExecutorService
2020-01-16 02:22:20.269  INFO 28490 --- [nboundChannel-6] .l.DirectReplyToMessageListenerContainer : Container initialized for queues: [amq.rabbitmq.reply-to]
2020-01-16 02:22:20.286  INFO 28490 --- [nboundChannel-6] .l.DirectReplyToMessageListenerContainer : SimpleConsumer [queue=amq.rabbitmq.reply-to, consumerTag=amq.ctag-IXWf-zEyI34xzQSSfbijzg identity=4bedbba5] started

And second that fails:

2020-01-16 02:23:20.247 DEBUG 28490 --- [nboundChannel-3] .WebSocketAnnotationMethodMessageHandler : Searching methods to handle SEND /app/private session=45yca5sy, lookupDestination='/private'
2020-01-16 02:23:20.248 DEBUG 28490 --- [nboundChannel-3] .WebSocketAnnotationMethodMessageHandler : Invoking PracticalTipSender#send2[1 args]
2020-01-16 02:23:20.248  WARN 28490 --- [nboundChannel-3] o.s.a.r.l.DirectMessageListenerContainer : Queue user1 is already configured for this container: org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer@3b152928, ignoring add
2020-01-16 02:23:20.250  WARN 28490 --- [nboundChannel-3] o.s.a.r.l.DirectMessageListenerContainer : Queue user1 is already configured for this container: org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer@3b152928, ignoring add
message listener..
 This is received routingkey: user1
2020-01-16 02:23:20.271  WARN 28490 --- [pool-1-thread-5] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.

org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener threw exception

EDIT

Putting DirectReplyToMessageListenerContainer in a separate class and setting its MessageListener as a @Bean and also directMessageListenerContainer.setMessageListener(practicalMessageListener()); as @Bean seemed to get rid of NPE. But still even the reply goes to amq.rabbitmq.reply-to.g2dkABVyYWJ..... it doesn't seem to be listened in the DirectReplyToMessageListenerContainer drtmlc.

@Component
class DirectMessageListener implements MessageListener {
    // This doesn't get invoked...
    @Override
    public void onMessage(Message message) {
        System.out.println("direct reply message sent..");

    }
}

@Component
class ReplyListener {

    @Bean
    public DirectMessageListener directMessageListener() {
        return new DirectMessageListener(); 
    }

    @Bean
    DirectReplyToMessageListenerContainer drtmlc (ConnectionFactory connectionFactory) {
        DirectReplyToMessageListenerContainer drtmlc =
                new DirectReplyToMessageListenerContainer(connectionFactory);
        drtmlc.setConnectionFactory(connectionFactory);
        drtmlc.setAcknowledgeMode(AcknowledgeMode.NONE);
        drtmlc.setMessageListener(directMessageListener());
        return drtmlc;
    }
}

Upvotes: 0

Views: 5158

Answers (1)

Gary Russell
Gary Russell

Reputation: 174554

Yes, you have mis-understood the feature.

Each channel gets its own pseudo queue; you can only receive from that same channel so a general message listener container won't hack it.

directMessageListenerContainer.setQueueNames("amq.rabbitmq.reply-to");

You simply can't do that.

The framework already supports direct reply-to directly, internally in the RabbitTemplate. The RabbitTemplate has its own DirectReplyToMessageListenerContainer which maintains a pool of channels.

Each request checks out a channel and the reply is returned there and then the channel is returned to the pool for reuse by another request.

Use RabbitTemplate.convertSendAndReceive(); the default behavior (in recent versions) will automatically use direct reply-to.

EDIT

Why not let the framework do all the heavy lifting and you just concentrate on your business logic:

@SpringBootApplication
public class So59760805Application {

    public static void main(String[] args) {
        SpringApplication.run(So59760805Application.class, args);
    }

    @Bean
    public SimpleMessageListenerContainer container(ConnectionFactory cf) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cf);
        container.setQueueNames("foo");
        container.setMessageListener(new MessageListenerAdapter(new MyListener()));
        return container;
    }

    @Bean
    public MyExtendedTemplate template(ConnectionFactory cf) {
        return new MyExtendedTemplate(cf);
    }

    @Bean
    public ApplicationRunner runner(RabbitTemplate template) {
        return args -> System.out.println(template.convertSendAndReceive("", "foo", "test"));
    }

}

class MyListener {

    public String handleMessage(String in) {
        return in.toUpperCase();
    }

}

class MyExtendedTemplate extends RabbitTemplate {

    MyExtendedTemplate(ConnectionFactory cf) {
        super(cf);
    }

    @Override
    public void onMessage(Message message) {
        System.out.println("Response received (before conversion): " + message);
        super.onMessage(message);
    }

}

The rabbit template uses direct reply-to (internally) by default.

Response received (before conversion): (Body:'TEST' MessageProperties [headers={}, correlationId=1, ...receivedRoutingKey=amq.rabbitmq.reply-to.g2dkAA5yYWJiaXRAZ29sbHVtMgAAeE0AAADmAw==.RQ/uxjR79PX/hZF+7iAdWw==, ...
TEST

Upvotes: 4

Related Questions