jandres
jandres

Reputation: 479

Spring amqp and the correlatinId

I have some doubts about Spring AMQP and the correlationId of an AMQP message. I have a Project with two queues (“queue.A” and “queue.B”) and one MessageListener on each:

public class ServerHandlerQueueA implements MessageListener {

    @Override
    public void onMessage(Message msg)


public class ServerHandlerQueueB implements MessageListener {

    @Override
    public void onMessage(Message msg)

In some cases, when I receive a message in the “queue.A”, I have to redirect it to “queue.B”:

rabbitTemplate.convertAndSend(routingkey, msg, new MessagePostProcessor() 
    { …});

In all cases I send the response to the client using the following:

String routingkey = msg.getMessageProperties().getReplyTo();
rabbitTemplate.convertAndSend(routingkey, respuesta, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message msg) throws AmqpException 
{….}
});

This is working correctly if I use Spring AMQP on the client side:

Object _response = getRabbitOperations().convertSendAndReceive(requestExchange, routingKeyManagement, msg,
new MessagePostProcessor() 
        {
            public Message postProcessMessage(Message message) throws AmqpException 
            {….}
        });

But If I use the java client (on then client side):

RpcClient _rpcClient = new RpcClient(channel, exchangeName, routingKey);
        Response _response = _rpcClient.doCall(new AMQP.BasicProperties.Builder()
                   .contentType("application/json")
                   .deliveryMode(2)
                   .priority(1)
                   .userId("myUser")
                   .appId("MyApp")
                   .replyTo(replyQueueName)
                   .correlationId(corrId)
                   .type("NewOrder")
                   .build(), 
                   messageBodyBytes);

I always get a NullPointerException in:

com.rabbitmq.client.RpcClient$1.handleDelivery(RpcClient.java:195)

I think it's because of the correlationId treatment. When I send a message with Spring AMQP I can see the “spring_listener_return_correlation” and “spring_request_return_correlation” headers in the consumer, but the “correlationId” property is always null. How can I make it compatible with the pure java client and with Spring AMQP? I am doing something wrong? Thanks!

------ EDIT ---------- I’ve upgraded to Spring AMQP 1.7.4 version. I send a message like this:

Object respuesta = getRabbitOperations().convertSendAndReceive(requestExchange, routingKey, _object, 
            new MessagePostProcessor() 
            {
                public Message postProcessMessage(Message message) throws AmqpException 
                {
                    message.getMessageProperties().setUserId(“myUser”);
                    message.getMessageProperties().setType(“myType”);
                    message.getMessageProperties().setAppId("myApp");
                    message.getMessageProperties().setMessageId(counter.incrementAndGet() + "-myType");
                    message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
                    message.getMessageProperties().setRedelivered(false);

                    return message;
                }
            });

On the server I have:

@Override
    public void onMessage(Message msg) 
    {
        MessageProperties mp = msg.getMessageProperties();
Gson __gson = new Gson();
                    String _stringMP = __gson.toJson(mp);
                    System.out.println("MessageProperties:\n" + _stringMP);
}

And I think the problem is that I always get the correlationId null:

{"headers":{"spring_listener_return_correlation":"49bd0a84-9abb-4719-b8a7-8668a4a77f32","spring_request_return_correlation":"32","__TypeId__":"MyType"},"messageId":"32-MyType","appId":"myApp","type":"MyType","replyTo":"amq.rabbitmq.reply-to.g2dkABByYWJiaXRATkRFUy1QQzAyAAAsMwAAAAgD.ia4+GgHgoeBnajbHxOgW+w\u003d\u003d","contentType":"application/json","contentEncoding":"UTF-8","contentLength":0,"contentLengthSet":false,"priority":0,"redelivered":false,"receivedExchange":"requestExchange","receivedRoutingKey":"inquiry","receivedUserId":"myUser",
"deliveryTag":5,"deliveryTagSet":true,"messageCount":0,"consumerTag":"amq.ctag-4H_P9CbWYZMML-QsmyaQYQ","consumerQueue":"inquiryQueue","receivedDeliveryMode":"NON_PERSISTENT"}

If I use the Java Client I can see the correlationId:

{"headers":{},"appId":"XBID","type":"MyOrders","correlationId":[49], ….

------------ EDIT 2 --------------------------------
I have tried with:

getRabbitOperations().convertAndSend(requestExchange, routingKeyInquiry, 
                _object, 
                new MessagePostProcessor() 
                {
                    public Message postProcessMessage(Message message) throws AmqpException 
                    {
                        message.getMessageProperties().setUserId(“myUser”);
                        message.getMessageProperties().setType(“myType”);
                        message.getMessageProperties().setAppId("myApp");
                        message.getMessageProperties().setMessageId(counter.incrementAndGet() + "-myType");
                        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
                        message.getMessageProperties().setRedelivered(false);
                        message.getMessageProperties().setCorrelationIdString(UUID.randomUUID().toString());
                        return message;
                    }
                });

But the "correlationId" is always null at the server side.

Upvotes: 0

Views: 802

Answers (1)

Gary Russell
Gary Russell

Reputation: 174484

What version are you using?

The return correlation headers have nothing to do with correlationId; they are used to correlate returned (mandatory) requests and replies.

As long as you copy the correlationId and replyTo from the queue.A message to the queue.B message, it should all work ok.

If you can't figure it out, post debug logs from all 3 servers someplace.

EDIT

This works fine for me...

@SpringBootApplication
public class So46316261Application implements CommandLineRunner {

    public static void main(String[] args) {
        SpringApplication.run(So46316261Application.class, args).close();
    }

    @Autowired
    private RabbitTemplate template;

    @Override
    public void run(String... arg0) throws Exception {
        Object reply = this.template.convertSendAndReceive("queue.A", "foo");
        System.out.println(reply);
        Connection conn = this.template.getConnectionFactory().createConnection();
        Channel channel = conn.createChannel(false);
        RpcClient client = new RpcClient(channel, "", "queue.A");
        Response response = client.doCall(new AMQP.BasicProperties.Builder()
                .contentType("text/plain")
                .deliveryMode(2)
                .priority(1)
                .userId("guest")
                .appId("MyApp")
                .replyTo("amq.rabbitmq.reply-to")
                .correlationId("bar")
                .type("NewOrder")
                .build(),
                "foo".getBytes());
        System.out.println(new String(response.getBody()));
        channel.close();
        conn.close();
    }

    @Bean
    public Queue queueA() {
        return new Queue("queue.A");
    }

    @Bean
    public Queue queueB() {
        return new Queue("queue.B");
    }

    @RabbitListener(queues = "queue.A")
    public void listen(Message in) {
        System.out.println(in);
        this.template.send("queue.B", in);
    }

    @RabbitListener(queues = "queue.B")
    public String listenB(Message in) {
        System.out.println(in);
        return "FOO";
    }

}


(Body:'foo' MessageProperties [headers={}, correlationId=1, replyTo=amq.rabbitmq.reply-to.g2dkABByYWJiaXRAbG9jYWxob3N0AAACyAAAAAAB.hp0xZxgVpXcuj9+5QkcOOw==, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=queue.B, deliveryTag=1, consumerTag=amq.ctag-oanHvT3YyUb_Lajl0gpZSQ, consumerQueue=queue.B])
FOO
(Body:'foo' MessageProperties [headers={}, appId=MyApp, type=NewOrder, correlationId=1, replyTo=amq.rabbitmq.reply-to.g2dkABByYWJiaXRAbG9jYWxob3N0AAACzAAAAAAB.okm02YXf0s0HdqZynVIn2w==, contentType=text/plain, contentLength=0, priority=1, redelivered=false, receivedExchange=, receivedRoutingKey=queue.B, deliveryTag=2, consumerTag=amq.ctag-oanHvT3YyUb_Lajl0gpZSQ, consumerQueue=queue.B])
FOO

Upvotes: 1

Related Questions