Nikita Sychou
Nikita Sychou

Reputation: 209

How can I pass parameters to queued background task (.NET Core)

In my web-application I have action with long-running task and I want to call this task in background. So, according to documentation .NET Core 3.1 Queued background tasks I use such code for this:

public interface IBackgroundTaskQueue
{
    ValueTask QueueBackgroundWorkItemAsync(Func<CancellationToken, ValueTask> workItem);

    ValueTask<Func<CancellationToken, ValueTask>> DequeueAsync(CancellationToken cancellationToken);
}

public class BackgroundTaskQueue : IBackgroundTaskQueue
{
    private readonly Channel<Func<CancellationToken, ValueTask>> _queue;

    public BackgroundTaskQueue(int capacity)
    {
        var options = new BoundedChannelOptions(capacity){FullMode = BoundedChannelFullMode.Wait};
        _queue = Channel.CreateBounded<Func<CancellationToken, ValueTask>>(options);
    }

    public async ValueTask QueueBackgroundWorkItemAsync(Func<CancellationToken, ValueTask> workItem)
    {
        if (workItem == null)throw new ArgumentNullException(nameof(workItem));
        await _queue.Writer.WriteAsync(workItem);
    }

    public async ValueTask<Func<CancellationToken, ValueTask>> DequeueAsync(CancellationToken cancellationToken)
    {
        var workItem = await _queue.Reader.ReadAsync(cancellationToken);
        return workItem;
    }
}

And hosted service

public class QueuedHostedService : BackgroundService
{
    private readonly ILogger<QueuedHostedService> _logger;

    public QueuedHostedService(IBackgroundTaskQueue taskQueue, ILogger<QueuedHostedService> logger)
    {
        TaskQueue = taskQueue;
        _logger = logger;
    }

    public IBackgroundTaskQueue TaskQueue { get; }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        await BackgroundProcessing(stoppingToken);
    }

    private async Task BackgroundProcessing(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            var workItem = await TaskQueue.DequeueAsync(stoppingToken);

            try
            {
                await workItem(stoppingToken);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Error occurred executing {WorkItem}.", nameof(workItem));
            }
        }
    }

    public override async Task StopAsync(CancellationToken stoppingToken)
    {
        _logger.LogInformation("Queued Hosted Service is stopping.");
        await base.StopAsync(stoppingToken);
    }
}

then I register all services

services.AddHostedService<QueuedHostedService>();
services.AddSingleton<IBackgroundTaskQueue>(new BackgroundTaskQueue(queueCapacity));

then I can success use this by calling without params like in such sample

public async Task<TenantBo> RegisterCompanyAsync(AddTenantBo addTenantBo)
{
  var tenantBo = new TenantBo();

  try
  {
    _companyRegistrationLogHelper.SetInfoLog(GetTenantId(tenantBo), 
      "Start create company: " + JsonConvert.SerializeObject(addTenantBo));

      InitOnCreateCompanyTasks(tenantBo);

      //skip if already create tenant 
      tenantBo = await CreateTenantAsync(tenantBo, addTenantBo);

      //run in background
      _companyRegistationQueue.QueueBackgroundWorkItemAsync(RunRegistrationCompanyMainAsync);

      return tenantBo;
  }
  catch (Exception e)
  {
    //some logs
    return tenantBo;
  }
}

private async ValueTask RunRegistrationCompanyMainAsync(CancellationToken cancellationToken)
{
  //some await Tasks
}

private async ValueTask RunRegistrationCompanyMainAsync(string tenantId, CancellationToken cancellationToken)
{
  //some await Tasks
}

so I can call only RunRegistrationCompanyMainAsync(CancellationToken cancellationToken) with one param and cannot call RunRegistrationCompanyMainAsync(string tenantId, CancellationToken cancellationToken) with two params

Can u help me to pass string param as argument for this task?

Upvotes: 11

Views: 4881

Answers (2)

Guru Stron
Guru Stron

Reputation: 143098

In QueueBackgroundWorkItemAsync(RunRegistrationCompanyMainAsync) call compiler actually performs cast from method group into a delegate. But to provide instance of Func delegate your are not limited to method groups, you can provide a lambda expression for example:

 var someTenantId = ....
 .....
_companyRegistationQueue.QueueBackgroundWorkItemAsync(ct => RunRegistrationCompanyMainAsync(someTenantId, ct));

Note that ideally RunRegistrationCompanyMainAsync should not capture dependencies of the "current" class (i.e. one with RegisterCompanyAsync) since it can lead to some issues (with DI scopes, object lifetimes and disposables for example).

Upvotes: 22

Nikita Sychou
Nikita Sychou

Reputation: 209

After some time I found solution. Just need to use Tuple like this

public class CompanyRegistationQueue : ICompanyRegistationQueue
    {
        private readonly Channel<Tuple<CreateCompanyModel, Func<CreateCompanyModel, CancellationToken, ValueTask>>> _queue;

        public CompanyRegistationQueue(int capacity)
        {
            var options = new BoundedChannelOptions(capacity) { FullMode = BoundedChannelFullMode.Wait };
            _queue = Channel.CreateBounded<Tuple<CreateCompanyModel, Func<CreateCompanyModel, CancellationToken, ValueTask>>**>(options);
        }

        public async ValueTask QueueBackgroundWorkItemAsync(Tuple<CreateCompanyModel, Func<CreateCompanyModel, CancellationToken, ValueTask>> workItem)
        {
            if (workItem == null) throw new ArgumentNullException(nameof(workItem));
            await _queue.Writer.WriteAsync(workItem);
        }

        public async ValueTask<Tuple<CreateCompanyModel, Func<CreateCompanyModel, CancellationToken, ValueTask>>> DequeueAsync(CancellationToken cancellationToken)
        {
            var workItem = await _queue.Reader.ReadAsync(cancellationToken);
            return workItem;
        }
    }

and then call it like this

private async Task BackgroundProcessing(CancellationToken stoppingToken)
        {
            while (!stoppingToken.IsCancellationRequested)
            {
                var workItem = await TaskQueue.DequeueAsync(stoppingToken);

                try
                {
//item2 is task
                    await workItem.Item2(workItem.Item1, stoppingToken);
                }
                catch (Exception ex)
                {
                    _logger.LogError(ex, "Error occurred executing {WorkItem}.", nameof(workItem));
                }
            }
        }

invoke in code

var paramValue = new Tuple<CreateCompanyModel, Func<CreateCompanyModel, CancellationToken, ValueTask>>(createCompanyModel, RunRegistrationCompanyMainAsync);
                await _companyRegistationQueue.QueueBackgroundWorkItemAsync(paramValue);

P.S. May by Tuple not best solution but its work

Upvotes: 1

Related Questions