Coconut9
Coconut9

Reputation: 143

Executing non-blocking asynchronous tasks one after the other

I am using C#, TPL. I have a class that contains some asynchronous methods that execute some sub-tasks, for simplicity I will only consider one method and one sub-task:

class Test1
{
    private Task SubTask() => Task.Delay(1000);

    public async Task FullTask()
    {
        Console.WriteLine("Task Start");
        await SubTask();
        Console.WriteLine("Task Middle");
        await SubTask();
        Console.WriteLine("Task End");
    }

    static async Task Main()
    {
        Test1 Test = new Test1();
        Task Task1 = Test.FullTask();
        Task Task2 = Test.FullTask();
        await Task.WhenAll(Task1, Task2);
    }
}

Upon execution the following (expected) result gets printed in the console:

Task Start
Task Start
Task Middle
Task Middle
Task End
Task End

The problem is that each call to FullTask must run after the previous has been completed, if multiple calls to FullTask happen simultaneously, they must be processed one by one. My first idea was to use the ContinueWith method:

class Test2
{
    private Task LastTask = Task.CompletedTask;

    private Task SubTask() => Task.Delay(1000);

    public Task FullTask()
    {
        lock(LastTask)
        {
            return LastTask = LastTask.ContinueWith(_ =>
            {
                Console.WriteLine("Task Start");
                SubTask().Wait();
                Console.WriteLine("Task Middle");
                SubTask().Wait();
                Console.WriteLine("Task End");
            });
        }
    }

    static async Task Main()
    {
        Test2 Test = new Test2();
        Task Task1 = Test.FullTask();
        Task Task2 = Test.FullTask();
        await Task.WhenAll(Task1, Task2);
    }
}

Again, upon execution the following (expected) result gets printed in the console:

Task Start
Task Middle
Task End
Task Start
Task Middle
Task End

The problem is that the lambda inside the FullTask blocks the thread because it uses SubTask().Wait(); and not await SubTask();. If there exist many instances of the Test2 class each executing the FullTask method, thread pool starvation will occur. Changing ether FullTask or the lambda (or both) to async does not solve the issue:

class Test3
{
    private Task LastTask = Task.CompletedTask;

    private Task SubTask() => Task.Delay(1000);

    public Task FullTask()
    {
        lock(LastTask)
        {
            return LastTask = LastTask.ContinueWith(async _ =>
            {
                Console.WriteLine("Task Start");
                await SubTask();
                Console.WriteLine("Task Middle");
                await SubTask();
                Console.WriteLine("Task End");
            });
        }
    }

    static async Task Main()
    {
        Test3 Test = new Test3();
        Task Task1 = Test.FullTask();
        Task Task2 = Test.FullTask();
        await Task.WhenAll(Task1, Task2);
    }
}

The ContinueWith returns a Task<Task>, the outer Task being the scheduled task to be executed after the LastTask. That task will end at the first await and it will return the inner (compiler generated) Task that will end at the end of the lambda. Here, I am intersected in the inner task which does not get created before the outer task reaches the first await. Thus approach does not work.

What I want is a non blocking approach that produces the same results as Test. Any ideas?

Upvotes: 2

Views: 476

Answers (1)

Stephen Cleary
Stephen Cleary

Reputation: 457402

If you want an iteration of the ContinueWith approach (using the more modern await), something like this should work:

private readonly object _lastTaskMutex = new object();
public Task FullTask()
{
  lock (_lastTaskMutex)
    return LastTask = RunAfterAsync(LastTask);

  async Task RunAfterAsync(Task lastTask)
  {
    try
    {
      await lastTask;
    }
    catch { }
    Console.WriteLine("Task Start");
    await SubTask();
    Console.WriteLine("Task Middle");
    await SubTask();
    Console.WriteLine("Task End");
  }
}

If you just care about mutual exclusion and the exact order doesn't matter, then a SemaphoreSlim would work:

private readonly SemaphoreSlim _mutex = new SemaphoreSlim(1);
public async Task FullTask()
{
  await _mutex.WaitAsync();
  try
  {
    Console.WriteLine("Task Start");
    await SubTask();
    Console.WriteLine("Task Middle");
    await SubTask();
    Console.WriteLine("Task End");
  }
  finally
  {
    _mutex.Release();
  }
}

Or, if what you really want is a strict-FIFO queue of operations, then a Channel or ActionBlock as suggested by Patagonias is appropriate; in this case, you usually want to pass a TaskCompletionSource<T> to indicate when individual requests have completed:

private readonly ActionBlock<TaskCompletionSource<object>> _block = new ActionBlock<TaskCompletionSource<object>>(async tcs =>
{
  try
  {
    Console.WriteLine("Task Start");
    await SubTask();
    Console.WriteLine("Task Middle");
    await SubTask();
    Console.WriteLine("Task End");
    tcs.TrySetResult(null);
  }
  catch (Exception ex)
  {
    tcs.TrySetException(ex);
  }
});

public Task FullTask()
{
  var tcs = new TaskCompletionSource<object>();
  _block.Post(tcs);
  return tcs.Task;
}

Upvotes: 6

Related Questions