Reputation: 600
I have recently started diving into RabbitMQ. I have created a Windows Service using the RabbitMQ .net library that functions as my consumer. This consumer will be used to handle bulk processes like sending out big batches of emails, etc.
I have built it by implementing the SimpleRpcServer
class that is part of the RabbitMQ .net library, and overriding the HandleCall/HandleCast
methods. Everything works great in terms of consuming and processing messages. We have began looking into deployment options that we can use to deploy this Windows Service to servers at Amazon Web Services. When deploying updates to the windows service, the service must be stopped, updated, then started again.
My question is: What can I do so that when the Stop
event is fired on the Windows Service, the service either waits for all currently delivered messages to the Consumer to finish processing AND requeue any messages that have been delivered but have not started processing yet.
Here is some sample code:
public partial class ExampleService: ServiceBase
{
private List<Task> _watcherTasks = new List<Task>();
protected override void OnStart(string[] args)
{
var factory = new ConnectionFactory() {
HostName = _hostname,
VirtualHost = _virtualHost,
UserName = _username,
Password = _password,
Ssl = new SslOption
{
Enabled = true,
ServerName = _hostname,
AcceptablePolicyErrors = SslPolicyErrors.RemoteCertificateNameMismatch |
SslPolicyErrors.RemoteCertificateChainErrors
},
RequestedHeartbeat = 30
};
conn = factory.CreateConnection();
var emailQueue = requestChannel.QueueDeclare("email", false, false, false, null);
var emailSub = new Subscription(requestChannel, emailQueue);
var emailServer = new ServiceBusConsumer(emailSub);
Task emailWatcher = Task.Run(() => emailServer.MainLoop());
_watcherTasks.Add(emailWatcher);
}
protected override void OnStop()
{
conn.Close();
Task.WaitAll(_watcherTasks.ToArray(), 60000);
}
}
public class ServiceBusConsumer : SimpleRpcServer
{
public ServiceBusConsumer(Subscription subscription) : base(subscription)
{
}
public override void HandleSimpleCast(bool isRedelivered, RabbitMQ.Client.IBasicProperties requestProperties, byte[] body)
{
try
{
//Uses some reflection and invokes function to process the message here.
}
catch (Exception ex)
{
//Creates event log entry of exception
}
}
}
Upvotes: 2
Views: 1620
Reputation: 1606
You can't explicitly push messages back to a Queue after they've been consumed. Terminating the process that has consumed the message should cause the message to be re-queued after a timeout period, provided that no Acknowledgement has been sent, and that your Queue is configured to receive acknowledgements. Consider though, that this may interrupt the ordering of messages in your Queue.
For example, let's say that you send messages M1 and M2, where M2 is dependent on M1. If you terminate your consumer(s) while M1 is being processed, and while M2 is still in the Queue, M1 will be re-queued behind M2. When your consumer restarts, these messages will be processed out-of-order.
Trying to achieve this introduces too much complexity. I would simply allow your Windows Service to finish processing its de-queued message(s), and then simply stop listening to the Queue, and terminate gracefully.Your code should look something like this:
while (!stopConsuming) {
try {
BasicDeliverEventArgs basicDeliverEventArgs;
var messageIsAvailable = consumer.Queue.Dequeue(timeout, out basicDeliverEventArgs);
if (!messageIsAvailable) continue;
var payload = basicDeliverEventArgs.Body;
var message = Encoding.UTF8.GetString(payload);
OnMessageReceived(new MessageReceivedEventArgs {
Message = message,
EventArgs = basicDeliverEventArgs
});
if (implicitAck && !noAck) channel.BasicAck(basicDeliverEventArgs.DeliveryTag, false);
}
catch (Exception exception) {
OnMessageReceived(new MessageReceivedEventArgs {
Exception = new AMQPConsumerProcessingException(exception)
});
if (!catchAllExceptions) Stop();
}
}
}
In this example, the stopConsuming
variable (mark as volatile
if accessing from another thread) will determine whether or not the Windows Service will continue reading messages from the Queue, after it has finished processing the current message. If set to true
, the loop will end, and no more messages will be de-queued.
I'm about to deploy a C# library that achieves exactly what you're asking for, among other things. Please check my blog, insidethecpu.com for details and tutorials. You might also be interested in this.
Upvotes: 2