Reputation: 1598
We have some queues, with same number of error queues.
My boss asked me to write a Window Service to consume the error queues, and transfer the data to DB. The Windows Service should be capable of managing which queues should be consumed, based on configurations set in DB table, this means for example that if I want not to consume a specific queue anymore, I must be able to disable the configuration for that specific queue on DB, without stop the Window Service.
I planned to write a Windows Service Job scheduled with help fo Quartz, the Job at every schedule should read the DB configuration, open a single connection, launch tasks for all the queues to consume, consumes the queues with a single channel per task. With this solution I hope to solve because at the end of the task channels will be closed, connection will be closed, error queues will be consumed, and in the next job schedule the number and the name of queue to read could be different. Also, consuming all the messages per single queues at every schedule shuld save the opening/closing connection/channel that could be onerous if I consume a single message per schedule, so the time of schedule should be enough to consume all error messages in queue, before next schedule. Also, scheduling the queues will give me the chance to configure the queues to consume in DB, without the need of stop the Windows Service.
Now I wrote a bit of code to test the solution of consume the single queue
var consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume(queueParName, autoAck: false, consumer: consumer);
bool queueEmpty = false;
while (!queueEmpty)
{
try
BasicDeliverEventArgs result;
bool bRead = consumer.Queue.Dequeue(timeOutQueueEmpty, out result);
if (bRead)
{
var msgBody = Encoding.UTF8.GetString(result.Body);
// TO DB ...
}
else
{
queueEmpty = true;
}
}
catch (EndOfStreamException ex)
{
// ...
}
}
The problem is QueueingBasicConsumer is obsolete and in many places is written to prefear EventBasicConsumer
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
// TO DB...
};
channel.BasicConsume(queue: queueParName, autoAck: true, consumer: consumer);
But with EventBasicConsumer I can't understand how to stop the consume when the queue is empty in order to cloes connection and channel
EDIT
A little bit of deeper explanation, as asked in the comment. It would be difficult to put all the code here, because we use a lot of compiled libraries of our company, so the code would be not completely understandable. Anyway, simplifing:
...
var schedule = SimpleScheduleBuilder.Create();
schedule.WithIntervalInSeconds(ConfigurationManager.AppSettings["..."]);
schedule.RepeatForever();
s.ScheduleQuartzJob(q =>
q.WithJob(() =>
JobBuilder.Create<RabbitErrorDequeuerJob>().Build())
.AddTrigger(() =>
TriggerBuilder.Create()
.WithSchedule(schedule)
.Build())
);
...
have Ninject IOC, and in Module file I inject the connection
// *** Ninject disposes every Disposable object that has another scope other than InTransientScope
Bind<IConnection>().ToMethod(x =>
{
IConnectionFactory cnf = new ConnectionFactory();
cnf.Uri = new Uri(ConfigurationManager.AppSettings["..."]);
return cnf.CreateConnection();
}).InCallScope();
I have a Job project, scheduled by Quartz every N Time, in which I have the Execute method (Quartz.IJob interface)
public void Execute(IJobExecutionContext context)
{
try
{
List<RabbitQueueConfiguration> lst = //...LIST OF QUEUTE TO DEQUE FROM DATABASE
foreach (RabbitQueueConfiguration queue in lst)
{
Task t = Task.Factory.StartNew(() =>
{
DequeuSingleQueue(queue);
});
}
}
catch (Exception ex)
{
_log.FatalFormat("Error ", ex.Message);
throw;
}
}
In DequeuSingleQueue(queue) there is the core of dequeuing
Upvotes: 0
Views: 4026