apanday
apanday

Reputation: 511

Rabbitmq headers exchange and confirmed delivery

I'm trying to use headers exchange on RabbitMQ, with mixed java and python components, and I need confirmed delivery.

I seem to get different a behaviour from the python (pika) and java clients.

In python:

channel.exchange_declare(exchange='headers_test',
¦   ¦   ¦   ¦   ¦   ¦   ¦type='headers',
¦   ¦   ¦   ¦   ¦   ¦   ¦durable=True)
channel.confirm_delivery()
result = channel.basic_publish(exchange='headers_test',
¦   ¦   ¦   ¦   ¦   ¦ routing_key='',
¦   ¦   ¦   ¦   ¦   ¦ mandatory=True,
¦   ¦   ¦   ¦   ¦   ¦ body=message,
¦   ¦   ¦   ¦   ¦   ¦ properties=pika.BasicProperties(
¦   ¦   ¦   ¦   ¦   ¦   ¦ delivery_mode=2,
¦   ¦   ¦   ¦   ¦   ¦   ¦ headers=message_headers))

If the headers don't match any bound consumer and the message cannot be routed, result is false

But in java/scala:

channel.exchangeDeclare("headers_test", "headers", true, false, null)
channel.confirmSelect

val props = MessageProperties.PERSISTENT_BASIC.builder
¦   ¦   ¦   ¦  .headers(messageHeaders).build
channel.basicPublish("headers_test", 
¦   ¦   ¦   ¦   ¦   ¦"", //routingKey
¦   ¦   ¦   ¦   ¦   ¦true, //mandatory
¦   ¦   ¦   ¦   ¦   ¦props, 
¦   ¦   ¦   ¦   ¦   ¦"data".getBytes)
channel.waitForConfirmsOrDie()

Here, when messageHeaders don't find a match, the message seems to just be dropped without an error.

Am I missing something or the behaviour of both clients really is different? And how can I get confirmed delivery using headers exchange in java?

Note: I already have a "complex" exchange to queues routing setup, I would rather avoid adding dead-letters routing to the game, and just fail-on-send.

Upvotes: 4

Views: 1896

Answers (1)

Vasily Sulatskov
Vasily Sulatskov

Reputation: 354

The problem that a message is considered confirmed even if there's no queue matching your headers. From the docs (https://www.rabbitmq.com/confirms.html):

For unroutable messages, the broker will issue a confirm once the exchange verifies a message won't route to any queue (returns an empty list of queues). If the message is also published as mandatory, the basic.return is sent to the client before basic.ack. The same is true for negative acknowledgements (basic.nack).

Instead you should be checking for a basic.return message to detect if a message has been routed or not.

I've checked with wireshark, and indeed I can see that if a message is not routed there's an AMQP basic.return message.

I supppose you should start with

channel.addReturnListener(new ReturnListener() {
  @Override
  public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
    System.out.println("App.handleReturn");
    System.out.println("replyCode = [" + replyCode + "], replyText = [" + replyText + "], exchange = [" + exchange + "], routingKey = [" + routingKey + "], properties = [" + properties + "], body = [" + body + "]");
  }
});

And indeed if a message hasn't been routed I get this:

replyCode = [312], replyText = [NO_ROUTE], exchange = [headers_logs], routingKey = [], pro....

Furthermore, if you want to emulate Pika's synchronous behavior in Java it seems you can do it by taking a current publish sequence number before publishing a message and registering a confirmation listener instead of relying on .waitForConfirmsOrDie().

So a full code sample would be:

channel.addReturnListener(new ReturnListener() {
      @Override
      public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("App.handleReturn");
        System.out.println("replyCode = [" + replyCode + "], replyText = [" + replyText + "], exchange = [" + exchange + "], routingKey = [" + routingKey + "], properties = [" + properties + "], body = [" + body + "]");
      }
    });

    channel.addConfirmListener(new ConfirmListener() {
      @Override
      public void handleAck(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("App.handleAck");
        System.out.println("deliveryTag = [" + deliveryTag + "], multiple = [" + multiple + "]");
      }

      @Override
      public void handleNack(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("App.handleNack");
        System.out.println("deliveryTag = [" + deliveryTag + "], multiple = [" + multiple + "]");
      }
});

long nextPublishSeqNo = channel.getNextPublishSeqNo();
System.out.println("nextPublishSeqNo = " + nextPublishSeqNo);

channel.basicPublish("headers_logs",
    "",
     true,
     props,
    "data".getBytes());

And inside of the return/confirm callback you need to look for a channel's publish sequence number you got before publishing a message.

If you look at what happens on the wire, in case if a message hasn't been routed to any queue, RabbitMq sends back a single basic.return message which also contains a confirmation (delivery tag). If a message has been routed, RabbitMq sends back a single bacic.ack message which also contains a confirmation.

It seems that RabbitMq Java client always calls basicReturn() callback before a basicConfirm(), so a logic to determine if a message has been routed or not can be this:

Register return and confirm listeners on a channel; Memorize a channel's next publish sequence number; Wait for either a return or a confirm callback. If it's a return callback - a message has not been routed, and you should ignore a further confirmation for the same delivery tag. If you receive a handleAck() callback before you receive a handleReturn() it means a message has been routed to a queue.

Though I am not sure in which case can .handleNack() be called.

Upvotes: 2

Related Questions