Shivang MIttal
Shivang MIttal

Reputation: 1001

Stop Rabbit MQ consumer Event If the Queue is Empty

I have added a Received Event Handler of EventingBasicConsumer in RabbitMQ. I am trying to check if the Queue is consumed(processed and now empty), It should Close the consumer and connection. I am not able to find the condition which can tell if the Queue is processed.

Please help

    public void ProcessQueue(string queueName, Func<string, bool> ProcessMessage)
    {
        //lock (this.Model)
        {
            this.Model.BasicQos(0, 1, false);
            EventingBasicConsumer consumer = new EventingBasicConsumer(this.Model);

            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body);
                bool processed = ProcessMessage.Invoke(message);
                if (processed)
                    this.SendAcknowledgement(ea.DeliveryTag);
                else
                    this.StopProcessingQueue(consumer.ConsumerTag);

                // Check if no message for next 2 minutes, 
                //      Stop Consumer and close connection

            };

            this.Model.BasicConsume(queue: queueName,
                             autoAck: false,
                             consumer: consumer);
        }
    }

Upvotes: 3

Views: 2663

Answers (3)

drzounds
drzounds

Reputation: 379

I just do a passive queue create to peek at how many messages are in the queue

 private static int passiveDeclareForMessageCount(IModel model)
    {

        Dictionary<string, object> args = new Dictionary<string, object>();
        args.Add("x-queue-mode", "lazy");
        int resultCount = 0;
        var response = model.QueueDeclarePassive(ConfigurationManager.AppSettings["LocalQueueName"].ToString());
        resultCount = (int)response.MessageCount;

        return resultCount;
    }

Upvotes: 1

Shivang MIttal
Shivang MIttal

Reputation: 1001

As I have not found any way to stop Rabbit MQ consumer Event if the queue is Empty, I have implemented below method to process messages by passing the message count from API

"localhost:/api/queues"

Below is the function to process messages till queue is empty

/// <summary>
/// (Recommanded) Processes the queue till the number of messages provided.
/// Added to manage the load (process batches by batches)
/// </summary>
/// <param name="queueName">Name of the queue.</param>
/// <param name="ProcessMessage">The process message.</param>
/// <param name="count">The count.</param>
public uint ProcessQueueByMessageCount(string queueName, Func<string, bool> HowToProcessMessage, uint messageCount)
{
    uint messagesToProcess = messageCount;
    using (var connect = this)
    {
        while (messageCount > 0)
        {
            BasicGetResult result = connect.Model.BasicGet(queueName, false);
            bool processed = HowToProcessMessage.Invoke(System.Text.Encoding.UTF8.GetString(result.Body));
            if (processed)
            {
                this.SendAcknowledgement(result.DeliveryTag);
                messageCount--;
            }
            else
            {
                connect.Model.BasicNack(result.DeliveryTag, false, true);
                break;
            }
        }
    }
    return messagesToProcess - messageCount;
}

Upvotes: 0

Dhejo
Dhejo

Reputation: 71

I could not find any property either so had to implement a timer which will reset each time a message is received and in case the elapsed time more than the 2 minutes you can fire a cleanup method which will stop the consumer and close connection

Upvotes: 0

Related Questions