Reputation: 981
I am looking at processing a hierarchical structure in a recursive fashion using C# asynchronous features (TPL/ async/await). Here is an overview of what I am trying to do
I have a jobs collection to process as shown below. Each Job has something to do and optionally can have one or more children which also have something to do. All the parent and child jobs call the same function to do the actual "work" and this function in "asynchronous" (code below)
/*
* Jobs Collection
* |
* |__ Job1
* | |__ Job4
* | | |__ Job7
* | |
* | |__ Job5
* |
* |__ Job2
* | |__ Job6
* |
* |__ Job3
* |
*/
There are 3 levels in the hierarchy.
I would want to start processing the first level (Job1, Job2, Job3) in parallel.
Once they start in parallel, each individual job will start processing itself, wait for its processing to complete (important) and then will go on to process its children recursively until the hierarchy ends. Children are dependent on the data processed by the parent and hence they wait for parent processing to complete.
Processing of the actual "Job" (called by parent and children) happens asynchronously as the calling method works asynchronously - therefore a "new thread" is not required (Task.StartNew()).
Here is the sample code that I am using to demonstrate the scenario -
public void Process()
{
WebJob[] jobs = CreateWebJobs(); // dummy jobs
// first level
Parallel.ForEach(jobs,
new ParallelOptions { MaxDegreeOfParallelism = 2 }, // parallelism hardcoded for simplicity
(job) => ExecuteJob(job));
}
private void ExecuteJob(WebJob job, [CallerMemberName] string memberName = "")
{
Console.ForegroundColor = ConsoleColor.DarkYellow;
Console.WriteLine("Caller> {0} :: {1} Job> {2} :: {3} Thread> {4}", memberName, "\t", job.Name, "\t", Thread.CurrentThread.ManagedThreadId);
Task t = GetDataAsync(job);
t.Wait(); // needed such that parent response is received before children start over (?).
if (job.Children != null)
{
job.Children.ToList().ForEach((r) =>
{
r.ParentResponse = job.Response; // Children need parent's response
ExecuteJob(r);
});
}
}
private async Task GetDataAsync(WebJob j)
{
// This is just test code. Ideally it would be an external call to some "async" method
await Task.Delay(1000);
j.Response = string.Format("{0} complete", j.Name);
Console.ForegroundColor = ConsoleColor.Cyan;
Console.WriteLine("parentResp>> {0} :: {1} Job>> {2} :: {3} Thread>> {4}", j.ParentResponse, "\t", j.Name, "\t", Thread.CurrentThread.ManagedThreadId);
Console.WriteLine("--------------");
}
private WebJob[] CreateWebJobs()
{
return new WebJob[] {
new WebJob() { Id=1, Name = "Job1", ExecURL = "http://url1",
Children = new WebJob[]
{
new WebJob()
{
Id=2, Name = "Job2", ExecURL = "http://url2",
Children = new WebJob[]
{
new WebJob() { Id=4, Name = "Job4", ExecURL = "http://url4" }
}
},
new WebJob()
{
Id=3, Name = "Job3", ExecURL = "http://url3"
}
}
},
new WebJob() { Id=5, Name = "Job5", ExecURL = "http://url5"}
};
}
This works ok, but I am not convinced if this recursive async pattern is an efficient approach. I was thinking to avoid t.Wait(). I have tried ContinueWith on t which appears no different in my understanding, I also read about ForEachAsync pattern and was wondering if that would be a fit. This solution would eventually be an ASP.NET Web API service. Any thoughts on this recursive async pattern?
Upvotes: 3
Views: 1220
Reputation: 54877
If GetDataAsync
is the only blocking operation that you have, then you can use asynchronous programming throughout, avoiding the need for Parallel.ForEach
calls or blocking Wait
calls.
public async Task Process()
{
WebJob[] jobs = CreateWebJobs(); // dummy jobs
await Task.WhenAll(jobs.Select(ExecuteJob));
}
private async Task ExecuteJob(WebJob job, [CallerMemberName] string memberName = "")
{
Console.ForegroundColor = ConsoleColor.DarkYellow;
Console.WriteLine("Caller> {0} :: {1} Job> {2} :: {3} Thread> {4}", memberName, "\t", job.Name, "\t", Thread.CurrentThread.ManagedThreadId);
await GetDataAsync(job);
if (job.Children != null)
{
var childTasks = job.Children.Select(r =>
{
r.ParentResponse = job.Response;
return ExecuteJob(r);
});
await Task.WhenAll(childTasks);
}
}
Edit: If the top-level method should block (rather than risk having consumers fire-and-forget), do:
public void Process()
{
WebJob[] jobs = CreateWebJobs(); // dummy jobs
Task.WaitAll(jobs.Select(ExecuteJob));
}
Upvotes: 4
Reputation: 456437
Since your core is asynchronous, you shouldn't be using parallel or multithreading at all. What you want is concurrency without parallelism - that is, asynchronous concurrency, usually done with Task.WhenAll
.
This is doubly true since you're planning to deploy to ASP.NET, where parallelism can significantly reduce your scalability.
public async Task ProcessAsync()
{
WebJob[] jobs = CreateWebJobs();
await Task.WhenAll(jobs.Select(x => ExecuteJobAsync(x)));
}
private async Task ExecuteJobAsync(WebJob job, [CallerMemberName] string memberName = "")
{
Console.ForegroundColor = ConsoleColor.DarkYellow;
Console.WriteLine("Caller> {0} :: {1} Job> {2} :: {3} Thread> {4}", memberName, "\t", job.Name, "\t", Thread.CurrentThread.ManagedThreadId);
await GetDataAsync(job);
if (job.Children != null)
{
var childTasks = job.Children.Select(async x =>
{
x.ParentResponse = job.Response; // Children need parent's response
await ExecuteJobAsync(x);
});
await Task.WhenAll(childTasks);
}
}
Upvotes: 2