No1Lives4Ever
No1Lives4Ever

Reputation: 6893

RabbitMQ stop waiting to messages

I'm using RabbitMQ to provide consume items from queue. My application handled by Windows Service that most of the time running.

On service START (OnStart) I using the method

while (true)
{
    var ea =(BasicDeliverEventArgs)consumer.Queue.Dequeue();
    // ... Handle this item
}

In order to consume item from the queue. The function 'Dequeue()' is blocking. The Thread will be blocked in this line until some item will arrive.

My problem start when I'm trying to implement the OnStop method of my service. My target is to stop waiting for new items when OnStop signal arrived. So, I'm modified my code to something like this:

while (true)
{
    if (this.IsStopping)
        return; // OnStop signal arrived. Stop waiting.
    var ea =(BasicDeliverEventArgs)consumer.Queue.Dequeue();
    // ... Handle this item
}

In some cases, the code above is working fine. BUT, if the queue is empty, the execution of the Windows Service won't happen.

How do you recommend me to solve this problem?

Upvotes: 1

Views: 1112

Answers (1)

Gabriele Santomaggio
Gabriele Santomaggio

Reputation: 22750

you have two ways to do that, the first one is called "poison message", during the stop service you send a message to the queue:

while (true)
{
    var ea =(BasicDeliverEventArgs)consumer.Queue.Dequeue();
    // ... Handle this item
   if (mymessage is MyPoisonMessage) 
     break;
}

Actually I don't i like, but it is a quick solution.

Another one is extend the class DefaultBasicConsumer and then use the consumer tag to close the consumer, something like that:

class SampleConsumer : DefaultBasicConsumer
    {

        public SampleConsumer(IModel channel) : base(channel)
        {
        }

        public override void HandleBasicDeliver(string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey,
            IBasicProperties properties, byte[] body)
        {
 .....

Then instance the class, get the consumerTag and close it in this way:

channel.BasicCancel(consumerTag) 

hope it helps.

Upvotes: 2

Related Questions