Arjun_TECH
Arjun_TECH

Reputation: 383

.NET core web api with queue processing

How to setup a .NET core web api that

Also, a routine which keeps checking the queue, and process the messages one by one.

As per the requirement, the api is going to act as the receiver of messages which may get hits as much as hundreds of times in a minute, while the messages it receives should be processed one by one. I am bit new to web apis, so wonder if such setup is good to have and if yes how to put together different components.

Thanks in advance..

Upvotes: 9

Views: 26922

Answers (2)

bravohex
bravohex

Reputation: 1034

Update for latecomers.... I used in Asp.net core 6, you can download sample here: https://learn.microsoft.com/en-us/dotnet/core/extensions/queue-service

Config Program

// and more...
#region Worker Services
builder.Host.ConfigureServices((context, services) =>
{
    services.AddSingleton<MonitorLoop>();
    services.AddHostedService<QueuedHostedService>();
    services.AddSingleton<IBackgroundTaskQueue>(_ =>
    {
        if (!int.TryParse(context.Configuration["QueueCapacity"], out var queueCapacity))
        {
            queueCapacity = 100;
        }
        return new BackgroundTaskQueue(queueCapacity);
    });
});
#endregion

#region App
// App config
var app = builder.Build();

// Monitor worker config
var monitorLoop = app.Services.GetRequiredService<MonitorLoop>()!;
monitorLoop.StartMonitorLoop();

// Configure the HTTP request pipeline.
if (app.Environment.IsDevelopment())
{
    app.UseSwagger();
    app.UseSwaggerUI();
}
else
{
    app.UseHttpsRedirection();
}

app.UseAuthentication();
app.UseAuthorization();

app.MapControllers();

await app.RunAsync();
#endregion

In Controller

// and more..
private readonly IMailService _mailService;
private readonly IBackgroundTaskQueue _queue;
// and more..
public AuthenticateController(
    IMailService mailService,
    IBackgroundTaskQueue queue)
{

    _mailService = mailService;
    _queue = queue;
}

[HttpPost]
[Route("forgot-password")]
public async Task<IActionResult> ForgotPassword([FromBody] ForgotPasswordModel model)
{
     // and more...
     // Queue processing
     await _queue.QueueBackgroundWorkItemAsync(async (token) =>
     {
        await _mailService.SendAsync(mailData, token);
     });

     return Ok();      
}

Hope this help !

Upvotes: 2

Alex Riabov
Alex Riabov

Reputation: 9165

Honestly, I don't think that it makes sense to receive and process messages in one process, so I would recommend to use external messaging system like RabbitMQ or Kafka or any other existing system of your preference, where you can put your messages and another process would consume it. It's quite big topic, you can start from this tutorial

If you still want to have it in one process it's also possible, you can create a background task queue, put there your messages and create background task which will consume them from that queue.

public interface IBackgroundTaskQueue
{
    void QueueBackgroundWorkItem(Func<CancellationToken, Task> workItem);

    Task<Func<CancellationToken, Task>> DequeueAsync(
        CancellationToken cancellationToken);
}

public class BackgroundTaskQueue : IBackgroundTaskQueue
{
    private ConcurrentQueue<Func<CancellationToken, Task>> _workItems = 
        new ConcurrentQueue<Func<CancellationToken, Task>>();
    private SemaphoreSlim _signal = new SemaphoreSlim(0);

    public void QueueBackgroundWorkItem(
        Func<CancellationToken, Task> workItem)
    {
        if (workItem == null)
        {
            throw new ArgumentNullException(nameof(workItem));
        }

        _workItems.Enqueue(workItem);
        _signal.Release();
    }

    public async Task<Func<CancellationToken, Task>> DequeueAsync(
        CancellationToken cancellationToken)
    {
        await _signal.WaitAsync(cancellationToken);
        _workItems.TryDequeue(out var workItem);

        return workItem;
    }
}

Background task:

public class QueuedHostedService : BackgroundService
{
    private readonly ILogger _logger;

    public QueuedHostedService(IBackgroundTaskQueue taskQueue, 
        ILoggerFactory loggerFactory)
    {
        TaskQueue = taskQueue;
        _logger = loggerFactory.CreateLogger<QueuedHostedService>();
    }

    public IBackgroundTaskQueue TaskQueue { get; }

    protected async override Task ExecuteAsync(
        CancellationToken cancellationToken)
    {
        _logger.LogInformation("Queued Hosted Service is starting.");

        while (!cancellationToken.IsCancellationRequested)
        {
            var workItem = await TaskQueue.DequeueAsync(cancellationToken);

            try
            {
                await workItem(cancellationToken);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, 
                   $"Error occurred executing {nameof(workItem)}.");
            }
        }

        _logger.LogInformation("Queued Hosted Service is stopping.");
    }
}

Registration:

public void ConfigureServices(IServiceCollection services)
{
    services.AddHostedService<QueuedHostedService>();
    services.AddSingleton<IBackgroundTaskQueue, BackgroundTaskQueue>();
}

Inject to controller:

public class ApiController
{
    private IBackgroundTaskQueue queue;
    public ApiController(IBackgroundTaskQueue queue)
    {
        this.queue = queue;
    }

    public IActionResult StartProcessing()
    {
        queue.QueueBackgroundWorkItem(async token =>
        {
            // put processing code here
        }

        return Ok();
    }
}

You can modify BackgroundTaskQueue to fit your requirements, but I hope you understand the idea behind this.

Upvotes: 28

Related Questions