Reputation: 119
I have created a simple Windows service to consume messages from an Azure service bus queue. I used Topshelf
to create the Windows service.
The code snippet below follows the example from Microsoft Learn: Quickstart: Send and receive messages from an Azure Service Bus queue (.NET).
var hf = HostFactory.New(x =>
{
x.Service<ServiceBusHelper>(s =>
{
s.ConstructUsing(serviceProvider.GetService<ServiceBusHelper>);
s.WhenStarted(async service => await service.ReceiveMessagesAsync());
s.WhenStopped(async service => await service.Stop());
});
x.RunAsNetworkService()
.StartAutomatically()
.EnableServiceRecovery(rc => rc.RestartService(1));
x.SetServiceName("MyWindowsService");
x.SetDisplayName("MyWindowsService");
x.SetDescription("MyWindowsService");
});
hf.Run();
ServiceBusHelper class:
public async Task ReceiveMessagesAsync()
{
var connectionString = _configuration.GetValue<string>("ServiceBusConnectionString");
var queueName = _configuration.GetValue<string>("ServiceBusQueueName");
await using (ServiceBusClient client = new ServiceBusClient(connectionString))
{
ServiceBusProcessor processor = client.CreateProcessor(queueName, new ServiceBusProcessorOptions());
processor.ProcessMessageAsync += MessageHandler;
processor.ProcessErrorAsync += ErrorHandler;
await processor.StartProcessingAsync();
System.Threading.Thread.Sleep(1000);//Wait for a minute before stop processing
await processor.StopProcessingAsync();
}
}
public async Task MessageHandler(ProcessMessageEventArgs args)
{
string body = args.Message.Body.ToString();
var messageBytes = Encoding.ASCII.GetBytes(body);
ProcessMessage(messageBytes);
await args.CompleteMessageAsync(args.Message);
}
public Task ErrorHandler(ProcessErrorEventArgs args)
{
return Task.CompletedTask;
}
public Task Stop()
{
return Task.CompletedTask;
}
Window service gets installed successfully and the status shows that it is running. However, it would not automatically consume the messages from the Service Bus.
If I manually stop and start the service, it will pick up the messages from the queue.
Not sure what am I missing with this implementation.
Upvotes: 1
Views: 4705
Reputation: 2561
.NetCore 3.1 introduced a new extension to work along side Microsoft.AspNetCore.Hosting Adding NuGet package Microsoft.Extensions.Hosting.WindowsServices you can add .UseWindowsService(). this will allow you run this as a windows service or Console app.
public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.UseWindowsService()
.ConfigureAppConfiguration((context, config) =>
{
// configure the app here.
})
.ConfigureServices((hostContext, services) =>
{
services.AddHostedService<QueueWorker>();
}).UseSerilog();
}
you can then create a background worker to start and stop processing the servicebus queue. Here is my implementaion:
public class QueueWorker : BackgroundService, IDisposable
{
protected ILogger<QueueWorker> _logger;
protected IQueueMessageReceiver _queueProcessor;
public QueueWorker()
{
}
public QueueWorker(ILogger<QueueWorker> logger, IQueueMessageReceiver queueMessageReceiver)
{
_logger = logger;
_queueProcessor = queueMessageReceiver;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await Task.CompletedTask.ConfigureAwait(false);
}
public override Task StartAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("Service Starting");
var task = _queueProcessor.StartProcessor(cancellationToken);
task.Wait();
if (task.IsFaulted)
{
throw new Exception("Unable to start Processor");
}
return base.StartAsync(cancellationToken);
}
public override async Task StopAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("Stopping Service");
await _queueProcessor.StopProcessor().ConfigureAwait(false);
await base.StopAsync(cancellationToken).ConfigureAwait(false);
}
public override void Dispose()
{
_logger.LogInformation("Disposing Service");
var loopCount = 0;
while (_queueProcessor != null && !_queueProcessor.IsClosedOrClosing() && loopCount < 5)
{
var task = Task.Delay(600);
task.Wait();
loopCount++;
}
base.Dispose();
GC.SuppressFinalize(this);
}
And The actual processor:
public class QueueMessageReceiver : IQueueMessageReceiver
{
private readonly ServiceBusClient _queueClient;
private ServiceBusProcessor _processor;
private readonly ReceiverConfiguration _configuration;
private readonly ILogger _logger;
private readonly ILoggerFactory _loggerFactory;
private Dictionary<string, string> _executionMatrix;
private readonly IServiceProvider _provider;
private CancellationToken _cancellationToken;
public QueueMessageReceiver(ReceiverConfiguration configuration, ILogger<QueueMessageReceiver> logger, IExecutionMatrix executionMatrix, ILoggerFactory loggerFactory, IServiceProvider serviceProvider)
{
if (configuration == null) throw new ArgumentException($"Configuration is missing from the expected ");
_configuration = configuration;
_logger = logger;
_loggerFactory = loggerFactory;
_executionMatrix = executionMatrix.GetExecutionMatrix();
_provider = serviceProvider;
_queueClient = new ServiceBusClient(_configuration.ConnectionString);
if (string.IsNullOrWhiteSpace(configuration.ConnectionString)) throw new ArgumentException($"ServiceBusConnectionString Object missing from the expected configuration under ConnectionStrings ");
if (configuration.QueueName == null) throw new ArgumentException($"Queue Name value missing from the expected configuration");
}
public async Task StartProcessor(CancellationToken cancellationToken)
{
if (!IsClosedOrClosing())
{
throw new FatalSystemException("ServiceBusProcessor is already running. ");
}
_cancellationToken = cancellationToken;
var options = new ServiceBusProcessorOptions
{
AutoCompleteMessages = _configuration.AutoComplete,
MaxConcurrentCalls = _configuration.MaxConcurrentCalls,
MaxAutoLockRenewalDuration = _configuration.MaxAutoRenewDuration
};
_processor = _queueClient.CreateProcessor(_configuration.QueueName, options);
_processor.ProcessMessageAsync += ProcessMessagesAsync;
_processor.ProcessErrorAsync += ProcessErrorAsync;
await _processor.StartProcessingAsync().ConfigureAwait(false);
}
public async Task StopProcessor()
{
await _processor.StopProcessingAsync();
await _processor.CloseAsync();
}
private Task ProcessErrorAsync(ProcessErrorEventArgs args)
{
_logger.LogError(args.Exception, "Uncaught handled exception", args.ErrorSource, args.FullyQualifiedNamespace, args.EntityPath);
return Task.CompletedTask;
}
private async Task ProcessMessagesAsync(ProcessMessageEventArgs args)
{
var message = args.Message;
// Process the message.
var sbMessage = $"Received message: SequenceNumber:{message.SequenceNumber} Body:{Encoding.UTF8.GetString(message.Body)}";
_logger.LogInformation(sbMessage);
//Handle your message
}
public bool IsClosedOrClosing()
{
return ((_processor == null) || _processor.IsClosed || !_processor.IsProcessing);
}
}
Upvotes: 3