Reputation: 1001
I have added a Received Event Handler of EventingBasicConsumer in RabbitMQ. I am trying to check if the Queue is consumed(processed and now empty), It should Close the consumer and connection. I am not able to find the condition which can tell if the Queue is processed.
Please help
public void ProcessQueue(string queueName, Func<string, bool> ProcessMessage)
{
//lock (this.Model)
{
this.Model.BasicQos(0, 1, false);
EventingBasicConsumer consumer = new EventingBasicConsumer(this.Model);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
bool processed = ProcessMessage.Invoke(message);
if (processed)
this.SendAcknowledgement(ea.DeliveryTag);
else
this.StopProcessingQueue(consumer.ConsumerTag);
// Check if no message for next 2 minutes,
// Stop Consumer and close connection
};
this.Model.BasicConsume(queue: queueName,
autoAck: false,
consumer: consumer);
}
}
Upvotes: 3
Views: 2663
Reputation: 379
I just do a passive queue create to peek at how many messages are in the queue
private static int passiveDeclareForMessageCount(IModel model)
{
Dictionary<string, object> args = new Dictionary<string, object>();
args.Add("x-queue-mode", "lazy");
int resultCount = 0;
var response = model.QueueDeclarePassive(ConfigurationManager.AppSettings["LocalQueueName"].ToString());
resultCount = (int)response.MessageCount;
return resultCount;
}
Upvotes: 1
Reputation: 1001
As I have not found any way to stop Rabbit MQ consumer Event if the queue is Empty, I have implemented below method to process messages by passing the message count from API
"localhost:/api/queues"
Below is the function to process messages till queue is empty
/// <summary>
/// (Recommanded) Processes the queue till the number of messages provided.
/// Added to manage the load (process batches by batches)
/// </summary>
/// <param name="queueName">Name of the queue.</param>
/// <param name="ProcessMessage">The process message.</param>
/// <param name="count">The count.</param>
public uint ProcessQueueByMessageCount(string queueName, Func<string, bool> HowToProcessMessage, uint messageCount)
{
uint messagesToProcess = messageCount;
using (var connect = this)
{
while (messageCount > 0)
{
BasicGetResult result = connect.Model.BasicGet(queueName, false);
bool processed = HowToProcessMessage.Invoke(System.Text.Encoding.UTF8.GetString(result.Body));
if (processed)
{
this.SendAcknowledgement(result.DeliveryTag);
messageCount--;
}
else
{
connect.Model.BasicNack(result.DeliveryTag, false, true);
break;
}
}
}
return messagesToProcess - messageCount;
}
Upvotes: 0
Reputation: 71
I could not find any property either so had to implement a timer which will reset each time a message is received and in case the elapsed time more than the 2 minutes you can fire a cleanup method which will stop the consumer and close connection
Upvotes: 0