Falco
Falco

Reputation: 1598

RAbbitMQ: How to stop consume, channel, and connection when the queue is empty using EventBasicConsumer

We have some queues, with same number of error queues.

My boss asked me to write a Window Service to consume the error queues, and transfer the data to DB. The Windows Service should be capable of managing which queues should be consumed, based on configurations set in DB table, this means for example that if I want not to consume a specific queue anymore, I must be able to disable the configuration for that specific queue on DB, without stop the Window Service.

I planned to write a Windows Service Job scheduled with help fo Quartz, the Job at every schedule should read the DB configuration, open a single connection, launch tasks for all the queues to consume, consumes the queues with a single channel per task. With this solution I hope to solve because at the end of the task channels will be closed, connection will be closed, error queues will be consumed, and in the next job schedule the number and the name of queue to read could be different. Also, consuming all the messages per single queues at every schedule shuld save the opening/closing connection/channel that could be onerous if I consume a single message per schedule, so the time of schedule should be enough to consume all error messages in queue, before next schedule. Also, scheduling the queues will give me the chance to configure the queues to consume in DB, without the need of stop the Windows Service.

Now I wrote a bit of code to test the solution of consume the single queue

var consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume(queueParName, autoAck: false, consumer: consumer);
bool queueEmpty = false;
while (!queueEmpty)
{
    try
        BasicDeliverEventArgs result;
        bool bRead = consumer.Queue.Dequeue(timeOutQueueEmpty, out result);

        if (bRead)
        {
            var msgBody = Encoding.UTF8.GetString(result.Body);
            // TO DB ...
        }
        else
        {
            queueEmpty = true;
        }
    }
    catch (EndOfStreamException ex)
    {
        // ...
    }
}

The problem is QueueingBasicConsumer is obsolete and in many places is written to prefear EventBasicConsumer

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
    var body = ea.Body;
    var message = Encoding.UTF8.GetString(body);

    // TO DB...
};
channel.BasicConsume(queue: queueParName, autoAck: true, consumer: consumer);

But with EventBasicConsumer I can't understand how to stop the consume when the queue is empty in order to cloes connection and channel

EDIT

A little bit of deeper explanation, as asked in the comment. It would be difficult to put all the code here, because we use a lot of compiled libraries of our company, so the code would be not completely understandable. Anyway, simplifing:

  1. I have an Host project, in which use Quartz Package to schedule a Job every N time.

...

var schedule = SimpleScheduleBuilder.Create();
schedule.WithIntervalInSeconds(ConfigurationManager.AppSettings["..."]);
schedule.RepeatForever();
s.ScheduleQuartzJob(q =>
    q.WithJob(() =>
        JobBuilder.Create<RabbitErrorDequeuerJob>().Build())
            .AddTrigger(() =>
                TriggerBuilder.Create()
                    .WithSchedule(schedule)
                    .Build())
);

...

have Ninject IOC, and in Module file I inject the connection

// *** Ninject disposes every Disposable object that has another scope other than InTransientScope
Bind<IConnection>().ToMethod(x =>
{
IConnectionFactory cnf = new ConnectionFactory();
cnf.Uri = new Uri(ConfigurationManager.AppSettings["..."]);
return cnf.CreateConnection();
}).InCallScope();

I have a Job project, scheduled by Quartz every N Time, in which I have the Execute method (Quartz.IJob interface)

 public void Execute(IJobExecutionContext context)
    {
        try
        {
            List<RabbitQueueConfiguration> lst = //...LIST OF QUEUTE TO DEQUE FROM DATABASE
            foreach (RabbitQueueConfiguration queue in lst)
            {
                Task t = Task.Factory.StartNew(() =>
                {
                    DequeuSingleQueue(queue);
                });
            }
        }
        catch (Exception ex)
        {
            _log.FatalFormat("Error ", ex.Message);
            throw;
        }
    }

In DequeuSingleQueue(queue) there is the core of dequeuing

Upvotes: 0

Views: 4026

Answers (1)

Luke Bakken
Luke Bakken

Reputation: 9627

You should use BasicGet to process messages one-at-a-time.

Upvotes: 0

Related Questions