cjashwell
cjashwell

Reputation: 629

How to implement subscribe model for RabbitMQ and requeue unprocessed messages

I have a Windows service that consumes messages from a remotely hosted RabbitMQ queue.

I have working code that wakes up at fixed intervals and connects to the queue and takes messages and processes them (mostly persists them to a database). This is a simplified version of what works for me:

        #region currentmethod

        IConnection connection = factory.CreateConnection();
        IModel channel = connection.CreateModel();
        channel.ExchangeDeclare(EXCHANGE_NAME, ExchangeType.Direct, true, false, null);
        QueueDeclareOk queueDeclareOK = channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: true, arguments: null);
        channel.QueueBind(queue: queueName, exchange: EXCHANGE_NAME, routingKey: routingkey);

        Console.WriteLine(queueDeclareOK.MessageCount.ToString());

        BasicGetResult result = channel.BasicGet(queueName, false);
        byte[] body = result.Body;
        string message = Encoding.UTF8.GetString(body);
        if (ProcessData(message))
            channel.BasicAck(result.DeliveryTag, false);
        else
            channel.BasicNack(result.DeliveryTag, false, true);

        channel.Close(200, "goodbye");
        connection.Close();
        #endregion currentmethod

What I would like to do is implement the publish-subscribe model as described https://www.rabbitmq.com/tutorials/tutorial-three-dotnet.html so that as messages enter the queue the subscriber processes them immediately. What I need to ensure is that if the message is not successfully processed it remains in the queue and is not lost.

Here is code basically along these lines:

        #region subscribeModel
        using (connection = factory.CreateConnection())
        {
            using (channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(EXCHANGE_NAME, ExchangeType.Direct, true, false, null);
                channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: true, arguments: null);  
                channel.QueueBind(queueName, EXCHANGE_NAME, routingKey);

                EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
                consumer.Received += (o, e) =>
                    {
                        string data = Encoding.ASCII.GetString(e.Body);
                        Console.WriteLine(data);                           
                    };

                string consumerTag = channel.BasicConsume(queueName, false, consumer);                 

                Console.WriteLine("Listening, press ENTER to quit");
                Console.ReadLine();                                        
            }
        }

        #endregion subscribeModel

I have three difficulties with this.

  1. After messages are read they are in the Unacked state but I need them to return to the Ready state. If they are successfully processed they should be Acked and leave the queue but if the processing fails they must remain in the queue.

  2. I need to do something other than write the data to the console, but where do I put the Process code in this model?

  3. When my code terminates in the subscription model, the queue that I have connected to is deleted, losing any messages that are in it in any state.

Does anyone know how I can requeue un-processed messages and prevent the queue from being deleted if my consumer closes?

Upvotes: 1

Views: 2043

Answers (1)

cjashwell
cjashwell

Reputation: 629

I've not had a chance to look at this much over the last week, but did some more research today and I think I can answer my own problem now, hopefully helping others along the way.

  1. It doesn't make sense to return messages to the Ready state when using a subscription model, as this will block the rest of the queue. The subscriber will keep trying to process the unacked message until it is removed from the queue. From : https://www.rabbitmq.com/confirms.html

Positive acknowledgements simply instruct RabbitMQ to record a message as delivered. Negative acknowledgements with basic.reject have the same effect. The difference is primarily in the semantics: positive acknowledgements assume a message was successfully processed while their negative counterpart suggests that a delivery wasn't processed but still should be deleted.

I can achieve what I need to do by sending defective messages to a new queue, where they can be processed differently to account for whatever it is that makes them defective, like so:

            using (channel)
            {

                EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
                consumer.Received += (o, e) =>
                {
                    string data = Encoding.ASCII.GetString(e.Body);
                    result = MQ.utilities.Utilities.ProcessData(data, counter);
                    if (result)
                        channel.BasicAck(e.DeliveryTag, false);
                    else
                    {
                        channel.BasicNack(e.DeliveryTag, false, false);
                        //send message to another queue ...
                        IBasicProperties basicProperties = channel.CreateBasicProperties();
                        channel.BasicPublish(_exchangeName, "newqueue", basicProperties, e.Body);
                    }
                };

                string consumerTag = channel.BasicConsume(_publishSubscribeQueueOne, false, consumer);

                Console.WriteLine("Listening, press ENTER to quit");
                Console.ReadLine();
            }  
  1. The procesdata method(s) can occur on event Received within the Consumer.

  2. This is because the queue is declared as autodelete. If it is not set to autodelete the queue remains, unacked messages are returned to the Ready state when the consumer terminates.

Upvotes: 1

Related Questions