ArtisticPhoenix
ArtisticPhoenix

Reputation: 21671

RabbitMq Unacknowledged Messages after Consumer basic_cancel

Ok without going into great detail on the whole system I have setup,

The problem I have is when consumers cancels (AMQPChannel->basic_cancel) listening to a queue, it leaves one additional message unacknowledged by this worker. It also doesn't trigger the normal callback to process this message.

A few details

I wont go into the exact way that I tell a given consumer to consume or cancel a give queue. But it all works perfectly the start consuming or cancel just fine. But when I cancel with a queue that still has messages they just forget about that last message, and I believe that cancel removes the callback for that queue and so there is no way to get that message back short of killing the consumer which is undesirable.

I did some debugging (just an example, not what I actually do)

   Debug::dump($this->getAmqpChannel()->getMethodQueue());
   $tag = $this->_tags[$queue]; //I keep track of the consumer tag on a queue by queue basis, $queue == {queuename} below
   $this->getAmqpChannel()->basic_cancel( $tag );
   Debug::dump($this->getAmqpChannel()->getMethodQueue());

The output of this is roughly

  array()
  RunCommand: basic_cancel //this works fine consumer forgets queue except ->
  array(1){
    [0] => array(3){
        [0] => string(5) "60,60",
        [1] => string(114) "amq.ctag-D9om-gD_cPevzeon52zpig\0\0\0\0\0\0\0\0\0G{queuename}",  //{queuename} is the name of the queue, which is based on clients information I cant share (such as their name)
       [2] => object(PhpAmqpLib\Message\AMQPMessage)#0 (9) {
            ["DELIVERY_MODE_NON_PERSISTENT":constant] => int(1),
            ["DELIVERY_MODE_PERSISTENT":constant] => int(2),
            ["body":public] => string(1358647) "{ ... "correlation_id":32,"max_correlation_id":38}"
            ["body_size":public] => int(135864),
            ["is_truncated":public] => bool(false),
            ["content_encoding":public] => null,
            ["propertyDefinitions":protected static] => array(14){ ... }
            ["delivery_info":public] => array(0){},
            ["prop_types":protected] => array(14){ ... }
      }
    }

Once the worker dies (or I kill it rather), the message gets put back in the queue, and I can pull it out in the RabbitMq Management thing (the plugin) under get messages. And there it is,

  Properties
    correlation_id: 32:38
    delivery_mode:  2
    content_encoding:   text/plain
    content_type:   application/json

The "correlation_id":32,"max_correlation_id":38 corresponds to the correlation_id: 32:38 because I need to keep track of the message parts. So I know that is the same message.

So why after I cancel do I get that last message that is stuck in zombie land, and is there anyway to kick it back out into the queue short of killing the consumer.

Also this is not a one-off it happens every time I cancel a queue that still has messages in it. So it has nothing to do with a given message. It's like it gets one last prefeched message and then because it's canceled there is no callback for that last one to run, and it's just stuck in limbo. Remember 0 prefetch is fetch all messages, 1 is the lowest you can set.

Any can help would be splendid.

UPDATE

I may have a solution by calling

 $this->getAmqpChannel()->basic_recover(true); //basic_recover($requeue)

Either just before or after the basic_cancel

This rejects the message and I can even test $this->getAmqpChannel()->getMethodQueue() as shown above to see if the $queue that I am canceling has a message bottled up (not implemented yet).

I was trying to avoid using recover but I think it should be ok because the consumers use a single channel and are blocking and at the worst it would just reject a valid message 1 time, which though not ideal should be acceptable.

However, in some instances I get an additional Exception from Rabbit,

  PRECONDITION_FAILED - unknown delivery tag {n}

If anyone has any details on this additional error that would be great. Also all the queues are require Ack, none of them are auto.

UPDATE1

I noticed in the stack trace queue_unbind so what I did, is internally track the bindings this way I can insure that the unbind is only done once. I will post some code after I do some more testing tomorrow, but my initial testing after implementing that did not produce the error anymore.

All this might sound kind of "Strange" and I could explain why and what I am doing with all this, but that's probably beyond the scope of the question. I will say that I have used this system in production for over 2 years (I engineered it) and we can do 180k searches a minute (about 100 if you consider all the parts of the system). We also have done over 280 billion searches with it sense I built it. We are also the leading company now in our industry having either eliminated our competitors or they send us their stuff and no longer do it in house. This is in large part because of our fast turn around and also the quality of our data. So this system does work and it works very well.

But in recent audits I noticed the Daily consumers were only having to deal with about 10 million rows (roughly 100 minutes of work), where as the nightly consumers deal with around 100 million rows (or about 20 hrs of work). The Daily consumers, can do nightly jobs but only outside of business hours (because it reduces response times during the day) so there is about a 10 hour window where the nightly jobs run only on a much smaller and less capable server. The solution this gives us is if there are no Daily jobs ( jobs submitted by clients) they can dynamically swap to the nightly stuff ( data warehoused ) on the fly. This should keep most of the responsiveness, while not wasting resources when no jobs are being submitted. We can horizontally scale as much as we want on the searches, but we do pay a lot for our main server, and were wasting about 8 hours of work we could do.

I could probably fill a small book with how it all works, but hopefully that gives some basic idea of what I an doing. I am also bound by some Non-Disclosure and Non-compete stuff in my contract, so I can really get into specific details.

Upvotes: 2

Views: 3053

Answers (1)

theMayer
theMayer

Reputation: 16167

Consumer in the RabbitMQ phraseology means a subscriber on the queue. (See this answer for details on the differences between channel, consumer, and connection).

When you turn on acknowledgements, they are turned on for the channel. Any messages delivered on that channel will have a delivery tag associated to them. When you finish processing the message, you need to tell the server via that same channel that the message was processed. Cancelling a consumer has no effect on acknowledgement of messages already delivered. In fact, it would be a perfectly valid use case to receive a message, cancel the consumer, process the message, then send an acknowledgement.

Thus, you have two options. You can leave the message unacknowledged, in which case all you have to do is close the channel and it will be re-queued at the head of the queue. Or, you can acknowledge it (either a nack or ack), in which case the message will be re-queued if nack or dropped if ack.

If I remember correctly, NOT specifying a prefetch count (via basic.qos) will result in prefetch of zero, meaning you have to ack the previous message before receiving the next message. I could be wrong on this. Of course, if you use a basic.get, you avoid this problem altogether with very little performance impact.

Upvotes: 0

Related Questions