Reputation: 7943
I have a method that will spawn lots of CPU-bound workers with Task.Run()
. Each worker may in turn spawn more workers, but I'm guaranteed that eventually, all workers will stop executing. My first thought was writing my method like this:
public Result OrchestrateWorkers(WorkItem[] workitems)
{
this.countdown = new CountdownEvent(0);
this.results = new ConcurrentQueue<WorkerResult>();
foreach (var workItem in workitems)
{
SpawnWorker(workItem);
}
this.countdown.Wait(); // until all spawned workers have completed.
return ComputeTotalResult(this.results);
}
The public SpawnWorker
method is used to start a worker, and to keep track of when they complete by enqueueing the worker's result and decrementing the countdown.
public void SpawnWorker(WorkItem workItem)
{
this.countdown.AddCount();
Task.Run(() => {
// Worker is passed an instance of this class
// so it can call SpawnWorker if it needs to.
var worker = new Worker(workItem, this);
var result = worker.DoWork();
this.results.Enqueue(result);
countdown.Signal();
});
}
Each worker can call SpawnWorker
as much as they like, but they're guaranteed to terminate at some point.
In this design, the thread that calls OrchestrateWorkers
will block untill all the workers have completed. My thinking is that it's a shame that there's a blocked thread; it would be nice if it could be doing work as well.
Would it be better to rearchitect the solution to something like this?
public Task<Result> OrchestrateWorkersAsync(WorkItem[] workitems)
{
if (this.tcs is not null) throw InvalidOperationException("Already running!");
this.tcs = new TaskCompletionSource<Result>();
this.countdown = 0; // just a normal integer.
this.results = new ConcurrentQueue<WorkerResult>();
foreach (var workItem in workitems)
{
SpawnWorker(workItem);
}
return tcs.Task;
}
public void SpawnWorker(WorkItem workItem)
{
Interlocked.Increment(ref this.countdown);
Task.Run(() => {
var worker = new Worker(workItem, this);
var result = worker.DoWork();
this.results.Enqueue(result);
if (Interlocked.Decrement(ref countdown) == 0)
{
this.tcs.SetResult(this.ComputeTotalResult(this.results));
}
});
}
EDIT: I've added a more full-fleshed sample below. It should be compileable and runnable. I'm seeing a ~10% performance improvement on my 8-core system, but I want to make sure this is the "canonical" way to orchestrate a swarm of spawning tasks.
using System.Collections.Concurrent;
using System.Diagnostics;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using System.Linq;
public class Program
{
const int ITERATIONS = 2500000;
const int WORKERS = 200;
public static async Task Main()
{
var o = new Orchestrator<int, int>();
var oo = new OrchestratorAsync<int, int>();
var array = Enumerable.Range(0, WORKERS);
var result = Time(() => o.OrchestrateWorkers(array, DoWork));
Console.Error.WriteLine("Sync spawned {0} workers", result.Count());
var resultAsync = await TimeAsync(() => oo.OrchestrateWorkersAsync(array, DoWorkAsync));
Console.Error.WriteLine("Async spawned {0} workers", resultAsync.Count());
}
static async Task<T> TimeAsync<T>(Func<Task<T>> work)
{
var sw = new Stopwatch();
sw.Start();
var result = await work();
sw.Stop();
Console.WriteLine("Total async time: {0}", sw.ElapsedMilliseconds);
return result;
}
static T Time<T>(Func<T> work)
{
var sw = new Stopwatch();
sw.Start();
var result = work();
sw.Stop();
Console.WriteLine("Total time: {0}", sw.ElapsedMilliseconds);
return result;
}
static int DoWork(int x, Orchestrator<int, int> arg2)
{
var rnd = new Random();
int n = 0;
for (int i = 0; i < ITERATIONS; ++i)
{
n += rnd.Next();
}
if (x >= 0)
{
arg2.SpawnWorker(-1, DoWork);
arg2.SpawnWorker(-1, DoWork);
}
return n;
}
static int DoWorkAsync(int x, OrchestratorAsync<int, int> arg2)
{
var rnd = new Random();
int n = 0;
for (int i = 0; i < ITERATIONS; ++i)
{
n += rnd.Next();
}
if (x >= 0)
{
arg2.SpawnWorker(-1, DoWorkAsync);
arg2.SpawnWorker(-1, DoWorkAsync);
}
return n;
}
public class Orchestrator<TWorkItem, TResult>
{
private ConcurrentQueue<TResult> results;
private CountdownEvent countdownEvent;
public Orchestrator()
{
this.results = new();
this.countdownEvent = new(1);
}
public IEnumerable<TResult> OrchestrateWorkers(
IEnumerable<TWorkItem> workItems,
Func<TWorkItem, Orchestrator<TWorkItem, TResult>, TResult> worker)
{
foreach (var workItem in workItems)
{
SpawnWorker(workItem, worker);
}
countdownEvent.Signal();
countdownEvent.Wait();
return results;
}
public void SpawnWorker(
TWorkItem workItem,
Func<TWorkItem, Orchestrator<TWorkItem, TResult>, TResult> worker)
{
this.countdownEvent.AddCount(1);
Task.Run(() =>
{
var result = worker(workItem, this);
this.results.Enqueue(result);
countdownEvent.Signal();
});
}
}
public class OrchestratorAsync<TWorkItem, TResult>
{
private ConcurrentQueue<TResult> results;
private volatile int countdown;
private TaskCompletionSource<IEnumerable<TResult>> tcs;
public OrchestratorAsync()
{
this.results = new();
this.countdown = 0;
this.tcs = new TaskCompletionSource<IEnumerable<TResult>>();
}
public Task<IEnumerable<TResult>> OrchestrateWorkersAsync(
IEnumerable<TWorkItem> workItems,
Func<TWorkItem, OrchestratorAsync<TWorkItem, TResult>, TResult> worker)
{
this.countdown = 0; // just a normal integer.
foreach (var workItem in workItems)
{
SpawnWorker(workItem, worker);
}
return tcs.Task;
}
public void SpawnWorker(TWorkItem workItem,
Func<TWorkItem, OrchestratorAsync<TWorkItem, TResult>, TResult> worker)
{
Interlocked.Increment(ref this.countdown);
Task.Run(() =>
{
var result = worker(workItem, this);
this.results.Enqueue(result);
if (Interlocked.Decrement(ref countdown) == 0)
{
this.tcs.SetResult(this.results);
}
});
}
}
}
Upvotes: 1
Views: 72
Reputation: 456587
There's one big problem with the code as-written: the tasks fired off by Task.Run
are discarded. This means there's no way to detect if anything goes wrong (i.e., an exception). It also means that there's not an easy way to aggregate results during execution, which is a common requirement; this lack of natural result handling is making the code collect results "out of band" in a separate collection.
These are the flags that this code is asking for adjustment to its structure. This is actual parallel code (i.e., not asynchronous), so parallel patterns are appropriate. You don't know how many tasks you need initially, so basic Data/Task Parallelism (such as a Parallel
or PLINQ approach) won't suffice. At this point, you're needing Dynamic Task Parallelism, which is the most complex kind of parallelism. The TPL does support it, but your code just has to use the lower-level APIs to get it done.
Since you have dynamically-added work and since your structure is generally tree-shaped (each work can add other work), you can introduce an artificial root and then use child tasks. This will give you two - and possibly three - benefits:
Example code (ignoring (3) since it's not clear whether results can be reduced):
public class OrchestratorParentChild<TWorkItem, TResult>
{
private readonly ConcurrentQueue<TResult> results = new();
public IEnumerable<TResult> OrchestrateWorkers(
IEnumerable<TWorkItem> workItems,
Func<TWorkItem, OrchestratorParentChild<TWorkItem, TResult>, TResult> worker)
{
var rootTask = Task.Factory.StartNew(
() =>
{
foreach (var workItem in workItems)
SpawnWorker(workItem, worker);
},
default,
TaskCreationOptions.None,
TaskScheduler.Default);
rootTask.Wait();
return results;
}
public void SpawnWorker(
TWorkItem workItem,
Func<TWorkItem, OrchestratorParentChild<TWorkItem, TResult>, TResult> worker)
{
_ = Task.Factory.StartNew(
() => results.Enqueue(worker(workItem, this)),
default,
TaskCreationOptions.AttachedToParent,
TaskScheduler.Default);
}
}
Note that an "orchestrator" isn't normally used. Code using the Dynamic Task Parallelism pattern usually just calls StartNew
directly instead of calling some orchestrator "spawn work" method.
In case you're wondering how this may look with results, here's one possibility:
public class OrchestratorParentChild<TWorkItem, TResult>
{
public TResult OrchestrateWorkers(
IEnumerable<TWorkItem> workItems,
Func<TWorkItem, OrchestratorParentChild<TWorkItem, TResult>, Func<IEnumerable<TResult>, TResult>, TResult> worker,
Func<IEnumerable<TResult>, TResult> resultReducer)
{
var rootTask = Task.Factory.StartNew(
() =>
{
var childTasks = workItems.Select(x => SpawnWorker(x, worker, resultReducer)).ToArray();
Task.WaitAll(childTasks);
return resultReducer(childTasks.Select(x => x.Result));
},
default,
TaskCreationOptions.None,
TaskScheduler.Default);
return rootTask.Result;
}
public Task<TResult> SpawnWorker(
TWorkItem workItem,
Func<TWorkItem, OrchestratorParentChild<TWorkItem, TResult>, Func<IEnumerable<TResult>, TResult>, TResult> worker,
Func<IEnumerable<TResult>, TResult> resultReducer)
{
return Task.Factory.StartNew(
() => worker(workItem, this, resultReducer),
default,
TaskCreationOptions.AttachedToParent,
TaskScheduler.Default);
}
}
As a final note, I rarely plug my book on this site, but you may find it helpful. Also a copy of "Parallel Programming with Microsoft® .NET: Design Patterns for Decomposition and Coordination on Multicore Architectures" if you can find it; it's a bit out of date in some places but still good overall if you want to do TPL programming.
Upvotes: 3