Artem Malinko
Artem Malinko

Reputation: 1771

RabbitMQ. Java client. Is it possible to acknowledge message not on the same thread it was received?

I want to fetch several messages, handle them and ack them all together after that. So basically I receive a message, put it in some queue and continue receiving messages from rabbit. Different thread will monitor this queue with received messages and process them when amount is sufficient. All I've been able to found about ack contains examples only for one message which processed on the same thread. Like this(from official docs):

channel.basicQos(1);

final Consumer consumer = new DefaultConsumer(channel) {
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    String message = new String(body, "UTF-8");

    System.out.println(" [x] Received '" + message + "'");
    try {
      doWork(message);
    } finally {
      System.out.println(" [x] Done");
      channel.basicAck(envelope.getDeliveryTag(), false);
    }
  }
};

And also documentation says this:

Channel instances must not be shared between threads. Applications should prefer using a Channel per thread instead of sharing the same Channel across multiple threads. While some operations on channels are safe to invoke concurrently, some are not and will result in incorrect frame interleaving on the wire.

So I'm confused here. If I'm acking some message and at the same time the channel is receiving another message from rabbit, is it considered to be two operations at the time? It seems to me like yes.

I've tried to acknowledge message on the same channel from different thread and it seems to work, but documentation says that I should not share channels between threads. So I've tried to do acknowledgment on different thread with different channel, but it fails, because delivery tag is unknown for this channel.

Is it possible to acknowledge message not on the same thread it was received?

UPD Example piece of code of what I want. It's in scala, but I think it's straightforward.

 case class AmqpMessage(envelope: Envelope, msgBody: String)

    val queue = new ArrayBlockingQueue[AmqpMessage](100)

    val consumeChannel = connection.createChannel()
    consumeChannel.queueDeclare(queueName, true, false, true, null)
    consumeChannel.basicConsume(queueName, false, new DefaultConsumer(consumeChannel) {
      override def handleDelivery(consumerTag: String,
                                  envelope: Envelope,
                                  properties: BasicProperties,
                                  body: Array[Byte]): Unit = {
        queue.put(new AmqpMessage(envelope, new String(body)))
      }
    })

    Future {
      // this is different thread
      val channel = connection.createChannel()
      while (true) {
        try {
          val amqpMessage = queue.take()
          channel.basicAck(amqpMessage.envelope.getDeliveryTag, false) // doesn't work
          consumeChannel.basicAck(amqpMessage.envelope.getDeliveryTag, false) // works, but seems like not thread safe
        } catch {
          case e: Exception => e.printStackTrace()
        }
      }
    }

Upvotes: 6

Views: 3085

Answers (3)

David Siro
David Siro

Reputation: 1906

Although the documentation is pretty restrictive, some operations on channels are safe to invoke concurrently. You may ACK messages in the different thread as long as consuming and acking are the only actions you do on the channel.

See this SO question, which deals with the same thing:

RabbitMQ and channels Java thread safety

Upvotes: 2

cantSleepNow
cantSleepNow

Reputation: 10182

As the docs say, one channel per thread the rest has no restrictions.

I would just to say a few things on your example. What you are trying to do here is wrong. There is no need to ACK the message only after you take it from ArrayBlockingQueue, because once you put it there, it stays there. ACKing it to RMQ has nothing to do with the other ArrayBlockingQueue queue.

Upvotes: 0

Bartosz Bilicki
Bartosz Bilicki

Reputation: 13235

For me your solution is correct. You are not sharing channels across thread. You never pass your channel object to another thread, you use it on the same thread that receives the messages.

It is not possible that you are

'acking some message and at the same time the channel is receiving another message from rabbit'

If your are in handleDelivery method, that thread is blocked by your code and has no chance of receiving another message.

As you found out, you cannot acknowledge message using channel other than channel that was used to receive message.

You must acknowledge using same channel, and you must do that on the same thread that was receiving message. So you may pass channel object to other methods, classes but you must be careful not to pass it to another thread.

I use this solution in my project It uses RabbitMQ listner and Spring Integration. For every AMQP message, one org.springframework.integration.Message is created. That message has AMPQ message body as payload, and AMQP channel and delivery tag as headers of my org.springframework.integration.Message.

If you want to acknowledge several messages, and they were delivered on the same channel, you should use

channel.basicAck(envelope.getDeliveryTag(), true);

For multiple channels, efficient algorithm is

  1. Lets say you have 100 messages, delivered using 10 channels
  2. you need to find max deliveryTag for each channel.
  3. invoke channel.basicAck(maxDeliveryTagForThatChannel, true);

This way, you need 10 basicAck (network roundtrips) not 100.

Upvotes: 0

Related Questions