theeranitp
theeranitp

Reputation: 1235

Rabbit MQ unack message not back to queue for consumer to process again

I use RabbitMQ as my queue message server, I use .NET C# client. When there is error in processing message from queue, message will not ackknowleage and still stuck in queue not be processed again as the document I understand.

I don't know if I miss some configurations or block of codes.

My idea now is auto manual ack the message if error and manual push this message to queue again.

I hope to have another better solution.

Thank you so much.

my code

        public void Subscribe(string queueName)
    {
        while (!Cancelled)
        {
            try
            {
                if (subscription == null)
                {
                    try
                    {
                        //try to open connection
                        connection = connectionFactory.CreateConnection();
                    }
                    catch (BrokerUnreachableException ex)
                    {
                        //You probably want to log the error and cancel after N tries, 
                        //otherwise start the loop over to try to connect again after a second or so.
                        log.Error(ex);
                        continue;
                    }

                    //crate chanel
                    channel = connection.CreateModel();
                    // This instructs the channel not to prefetch more than one message
                    channel.BasicQos(0, 1, false);
                    // Create a new, durable exchange
                    channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, true, false, null);
                    // Create a new, durable queue
                    channel.QueueDeclare(queueName, true, false, false, null);
                    // Bind the queue to the exchange
                    channel.QueueBind(queueName, exchangeName, queueName);
                    //create subscription
                    subscription = new Subscription(channel, queueName, false);
                }

                BasicDeliverEventArgs eventArgs;
                var gotMessage = subscription.Next(250, out eventArgs);//250 millisecond
                if (gotMessage)
                {
                    if (eventArgs == null)
                    {
                        //This means the connection is closed.
                        DisposeAllConnectionObjects();
                        continue;//move to new iterate
                    }

                    //process message

                   channel.BasicAck(eventArgs.DeliveryTag, false);


                }
            }
            catch (OperationInterruptedException ex)
            {
                log.Error(ex);
                DisposeAllConnectionObjects();
            }
        }

        DisposeAllConnectionObjects();
    }

    private void DisposeAllConnectionObjects()
    {
        //dispose subscription
        if (subscription != null)
        {
            //IDisposable is implemented explicitly for some reason.
            ((IDisposable)subscription).Dispose();
            subscription = null;
        }

        //dipose channel
        if (channel != null)
        {
            channel.Dispose();
            channel = null;
        }

        //check if connection is not null and dispose it
        if (connection != null)
        {
            try
            {
                connection.Dispose();
            }
            catch (EndOfStreamException ex)
            {
                log.Error(ex);
            }
            catch (OperationInterruptedException ex)//handle this get error from dispose connection 
            {
                log.Error(ex);
            }
            catch (Exception ex)
            {
                log.Error(ex);
            }
            connection = null;
        }
    }

Upvotes: 1

Views: 7610

Answers (1)

Ash
Ash

Reputation: 141

I think you may have misunderstood the RabbitMQ documentation. If a message does not get ack'ed from the consumer, the Rabbit broker will requeue the message onto the queue for consumption. I don't believe your suggested method for ack'ing and then requeuing a message is a good idea, and will just make the problem more complex.

If you want to explicitly "reject" a message because the consumer had a problem processing it, you could use the Nack feature of Rabbit.

For example, within your catch exception blocks, you could use:

subscription.Model.BasicNack(eventArgs.DeliveryTag, false, true);

This will inform the Rabbit broker to requeue the message. Basically, you pass the delivery tag, false to say it is not multiple messages, and true to requeue the message. If you want to reject the message and NOT requeue, just change true to false.

Additionally, you have created a subscription, so I think you should perform your ack's directly on this, not through the channel.

Change:

channel.BasicAck(eventArgs.DeliveryTag, false);

To:

subscription.Ack(); 

This method of ack'ing is much cleaner since you are then keeping everything subscription-related on the subscription object, rather than messing around with the channel that you've already subscribed to.

Upvotes: 10

Related Questions