John Källén
John Källén

Reputation: 7943

Is it better to block on an event in CPU-bound multithreaded method than making it async?

I have a method that will spawn lots of CPU-bound workers with Task.Run(). Each worker may in turn spawn more workers, but I'm guaranteed that eventually, all workers will stop executing. My first thought was writing my method like this:

public Result OrchestrateWorkers(WorkItem[] workitems)
{
    this.countdown = new CountdownEvent(0);
    this.results = new ConcurrentQueue<WorkerResult>();
    foreach (var workItem in workitems)
    {
        SpawnWorker(workItem);
    }
    this.countdown.Wait(); // until all spawned workers have completed.
    return ComputeTotalResult(this.results);
}

The public SpawnWorker method is used to start a worker, and to keep track of when they complete by enqueueing the worker's result and decrementing the countdown.

public void SpawnWorker(WorkItem workItem)
{
    this.countdown.AddCount();
    Task.Run(() => {
        // Worker is passed an instance of this class
        // so it can call SpawnWorker if it needs to.
        var worker = new Worker(workItem, this);
        var result = worker.DoWork();
        this.results.Enqueue(result);
        countdown.Signal();
    });
}

Each worker can call SpawnWorker as much as they like, but they're guaranteed to terminate at some point.

In this design, the thread that calls OrchestrateWorkers will block untill all the workers have completed. My thinking is that it's a shame that there's a blocked thread; it would be nice if it could be doing work as well.

Would it be better to rearchitect the solution to something like this?

public Task<Result> OrchestrateWorkersAsync(WorkItem[] workitems)
{
    if (this.tcs is not null) throw InvalidOperationException("Already running!");
    this.tcs = new TaskCompletionSource<Result>();
    this.countdown = 0;     // just a normal integer.
    this.results = new ConcurrentQueue<WorkerResult>();
    foreach (var workItem in workitems)
    {
        SpawnWorker(workItem);
    }
    return tcs.Task;
}

public void SpawnWorker(WorkItem workItem)
{
    Interlocked.Increment(ref this.countdown);
    Task.Run(() => {
        var worker = new Worker(workItem, this);
        var result = worker.DoWork();
        this.results.Enqueue(result);
        if (Interlocked.Decrement(ref countdown) == 0)
        {
            this.tcs.SetResult(this.ComputeTotalResult(this.results));
        }
    });
}

EDIT: I've added a more full-fleshed sample below. It should be compileable and runnable. I'm seeing a ~10% performance improvement on my 8-core system, but I want to make sure this is the "canonical" way to orchestrate a swarm of spawning tasks.

using System.Collections.Concurrent;
using System.Diagnostics;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using System.Linq;

public class Program
{
    const int ITERATIONS = 2500000;
    const int WORKERS = 200;

    public static async Task Main()
    {
        var o = new Orchestrator<int, int>();
        var oo = new OrchestratorAsync<int, int>();
        var array = Enumerable.Range(0, WORKERS);

        var result = Time(() => o.OrchestrateWorkers(array, DoWork));
        Console.Error.WriteLine("Sync spawned {0} workers", result.Count());
        var resultAsync = await TimeAsync(() => oo.OrchestrateWorkersAsync(array, DoWorkAsync));
        Console.Error.WriteLine("Async spawned {0} workers", resultAsync.Count());
    }

    static async Task<T> TimeAsync<T>(Func<Task<T>> work)
    {
        var sw = new Stopwatch();
        sw.Start();
        var result = await work();
        sw.Stop();
        Console.WriteLine("Total async time: {0}", sw.ElapsedMilliseconds);
        return result;
    }

    static T Time<T>(Func<T> work)
    {
        var sw = new Stopwatch();
        sw.Start();
        var result = work();
        sw.Stop();
        Console.WriteLine("Total time: {0}", sw.ElapsedMilliseconds);
        return result;
    }


    static int DoWork(int x, Orchestrator<int, int> arg2)
    {
        var rnd = new Random();
        int n = 0;
        for (int i = 0; i < ITERATIONS; ++i)
        {
            n += rnd.Next();
        }
        if (x >= 0)
        {
            arg2.SpawnWorker(-1, DoWork);
            arg2.SpawnWorker(-1, DoWork);
        }
        return n;
    }

    static int DoWorkAsync(int x, OrchestratorAsync<int, int> arg2)
    {
        var rnd = new Random();
        int n = 0;
        for (int i = 0; i < ITERATIONS; ++i)
        {
            n += rnd.Next();
        }
        if (x >= 0)
        {
            arg2.SpawnWorker(-1, DoWorkAsync);
            arg2.SpawnWorker(-1, DoWorkAsync);
        }
        return n;
    }

    public class Orchestrator<TWorkItem, TResult>
    {
        private ConcurrentQueue<TResult> results;
        private CountdownEvent countdownEvent;

        public Orchestrator()
        {
            this.results = new();
            this.countdownEvent = new(1);
        }


        public IEnumerable<TResult> OrchestrateWorkers(
            IEnumerable<TWorkItem> workItems,
            Func<TWorkItem, Orchestrator<TWorkItem, TResult>, TResult> worker)
        {
            foreach (var workItem in workItems)
            {
                SpawnWorker(workItem, worker);
            }
            countdownEvent.Signal();
            countdownEvent.Wait();
            return results;
        }

        public void SpawnWorker(
            TWorkItem workItem,
            Func<TWorkItem, Orchestrator<TWorkItem, TResult>, TResult> worker)
        {
            this.countdownEvent.AddCount(1);
            Task.Run(() =>
            {
                var result = worker(workItem, this);
                this.results.Enqueue(result);
                countdownEvent.Signal();
            });
        }
    }

    public class OrchestratorAsync<TWorkItem, TResult>
    {
        private ConcurrentQueue<TResult> results;
        private volatile int countdown;
        private TaskCompletionSource<IEnumerable<TResult>> tcs;

        public OrchestratorAsync()
        {
            this.results = new();
            this.countdown = 0;
            this.tcs = new TaskCompletionSource<IEnumerable<TResult>>();
        }

        public Task<IEnumerable<TResult>> OrchestrateWorkersAsync(
            IEnumerable<TWorkItem> workItems,
            Func<TWorkItem, OrchestratorAsync<TWorkItem, TResult>, TResult> worker)
        {
            this.countdown = 0;     // just a normal integer.
            foreach (var workItem in workItems)
            {
                SpawnWorker(workItem, worker);
            }
            return tcs.Task;
        }

        public void SpawnWorker(TWorkItem workItem,
            Func<TWorkItem, OrchestratorAsync<TWorkItem, TResult>, TResult> worker)
        {
            Interlocked.Increment(ref this.countdown);
            Task.Run(() =>
            {
                var result = worker(workItem, this);
                this.results.Enqueue(result);
                if (Interlocked.Decrement(ref countdown) == 0)
                {
                    this.tcs.SetResult(this.results);
                }
            });
        }
    }
}

Upvotes: 1

Views: 72

Answers (1)

Stephen Cleary
Stephen Cleary

Reputation: 456587

There's one big problem with the code as-written: the tasks fired off by Task.Run are discarded. This means there's no way to detect if anything goes wrong (i.e., an exception). It also means that there's not an easy way to aggregate results during execution, which is a common requirement; this lack of natural result handling is making the code collect results "out of band" in a separate collection.

These are the flags that this code is asking for adjustment to its structure. This is actual parallel code (i.e., not asynchronous), so parallel patterns are appropriate. You don't know how many tasks you need initially, so basic Data/Task Parallelism (such as a Parallel or PLINQ approach) won't suffice. At this point, you're needing Dynamic Task Parallelism, which is the most complex kind of parallelism. The TPL does support it, but your code just has to use the lower-level APIs to get it done.

Since you have dynamically-added work and since your structure is generally tree-shaped (each work can add other work), you can introduce an artificial root and then use child tasks. This will give you two - and possibly three - benefits:

  1. All exceptions are no longer ignored. Child task exceptions are propagated up to their parents, all the way to the root.
  2. You know when all the tasks are complete. Since parent tasks only complete when all their children complete, there's no need for a countdown event or any other orchestrating synchronization primitive; your code just has to wait on the root task, and all the work is done when that task completes.
  3. If it is possible/desirable to reduce results as you go (a common requirement), then the child tasks can return the results and you will end up with the already-reduced results as the result of your root task.

Example code (ignoring (3) since it's not clear whether results can be reduced):

public class OrchestratorParentChild<TWorkItem, TResult>
{
  private readonly ConcurrentQueue<TResult> results = new();

  public IEnumerable<TResult> OrchestrateWorkers(
     IEnumerable<TWorkItem> workItems,
     Func<TWorkItem, OrchestratorParentChild<TWorkItem, TResult>, TResult> worker)
  {
    var rootTask = Task.Factory.StartNew(
       () =>
       {
         foreach (var workItem in workItems)
           SpawnWorker(workItem, worker);
       },
       default,
       TaskCreationOptions.None,
       TaskScheduler.Default);
    rootTask.Wait();

    return results;
  }

  public void SpawnWorker(
      TWorkItem workItem,
      Func<TWorkItem, OrchestratorParentChild<TWorkItem, TResult>, TResult> worker)
  {
    _ = Task.Factory.StartNew(
        () => results.Enqueue(worker(workItem, this)),
        default,
        TaskCreationOptions.AttachedToParent,
        TaskScheduler.Default);
    }
}

Note that an "orchestrator" isn't normally used. Code using the Dynamic Task Parallelism pattern usually just calls StartNew directly instead of calling some orchestrator "spawn work" method.

In case you're wondering how this may look with results, here's one possibility:

public class OrchestratorParentChild<TWorkItem, TResult>
{
  public TResult OrchestrateWorkers(
      IEnumerable<TWorkItem> workItems,
      Func<TWorkItem, OrchestratorParentChild<TWorkItem, TResult>, Func<IEnumerable<TResult>, TResult>, TResult> worker,
      Func<IEnumerable<TResult>, TResult> resultReducer)
  {
    var rootTask = Task.Factory.StartNew(
        () =>
        {
          var childTasks = workItems.Select(x => SpawnWorker(x, worker, resultReducer)).ToArray();
          Task.WaitAll(childTasks);
          return resultReducer(childTasks.Select(x => x.Result));
        },
        default,
        TaskCreationOptions.None,
        TaskScheduler.Default);
    return rootTask.Result;
  }

  public Task<TResult> SpawnWorker(
      TWorkItem workItem,
      Func<TWorkItem, OrchestratorParentChild<TWorkItem, TResult>, Func<IEnumerable<TResult>, TResult>, TResult> worker,
      Func<IEnumerable<TResult>, TResult> resultReducer)
  {
    return Task.Factory.StartNew(
        () => worker(workItem, this, resultReducer),
        default,
        TaskCreationOptions.AttachedToParent,
        TaskScheduler.Default);
  }
}

As a final note, I rarely plug my book on this site, but you may find it helpful. Also a copy of "Parallel Programming with Microsoft® .NET: Design Patterns for Decomposition and Coordination on Multicore Architectures" if you can find it; it's a bit out of date in some places but still good overall if you want to do TPL programming.

Upvotes: 3

Related Questions