Lalman
Lalman

Reputation: 981

Recursive Asynchronous calls with TPL/ async await

I am looking at processing a hierarchical structure in a recursive fashion using C# asynchronous features (TPL/ async/await). Here is an overview of what I am trying to do

I have a jobs collection to process as shown below. Each Job has something to do and optionally can have one or more children which also have something to do. All the parent and child jobs call the same function to do the actual "work" and this function in "asynchronous" (code below)

/*
 *  Jobs Collection
 *  |
 *  |__ Job1
 *  |    |__ Job4
 *  |    |     |__ Job7
 *  |    |
 *  |    |__ Job5
 *  |
 *  |__ Job2
 *  |    |__ Job6
 *  |
 *  |__ Job3
 *  |
 */
  1. There are 3 levels in the hierarchy.

  2. I would want to start processing the first level (Job1, Job2, Job3) in parallel.

  3. Once they start in parallel, each individual job will start processing itself, wait for its processing to complete (important) and then will go on to process its children recursively until the hierarchy ends. Children are dependent on the data processed by the parent and hence they wait for parent processing to complete.

  4. Processing of the actual "Job" (called by parent and children) happens asynchronously as the calling method works asynchronously - therefore a "new thread" is not required (Task.StartNew()).

Here is the sample code that I am using to demonstrate the scenario -

public void Process()
{
    WebJob[] jobs = CreateWebJobs(); // dummy jobs

    // first level 
    Parallel.ForEach(jobs,
                new ParallelOptions { MaxDegreeOfParallelism = 2 }, // parallelism hardcoded for simplicity
                (job) => ExecuteJob(job));
}

private void ExecuteJob(WebJob job, [CallerMemberName] string memberName = "")
{
    Console.ForegroundColor = ConsoleColor.DarkYellow;
    Console.WriteLine("Caller> {0} :: {1} Job> {2} :: {3} Thread> {4}", memberName, "\t", job.Name, "\t", Thread.CurrentThread.ManagedThreadId);

    Task t = GetDataAsync(job);
    t.Wait(); // needed such that parent response is received before children start over (?).


    if (job.Children != null)
    {
        job.Children.ToList().ForEach((r) =>
        {
            r.ParentResponse = job.Response; // Children need parent's response
            ExecuteJob(r);
        });
    }
}

private async Task GetDataAsync(WebJob j)
{
    // This is just test code. Ideally it would be an external call to some "async" method
    await Task.Delay(1000);
    j.Response = string.Format("{0} complete", j.Name);
    Console.ForegroundColor = ConsoleColor.Cyan;
    Console.WriteLine("parentResp>> {0} :: {1} Job>> {2} :: {3} Thread>> {4}", j.ParentResponse, "\t", j.Name, "\t", Thread.CurrentThread.ManagedThreadId);
    Console.WriteLine("--------------");
}

private WebJob[] CreateWebJobs()
{
    return new WebJob[] {
        new WebJob() { Id=1, Name = "Job1", ExecURL = "http://url1", 
            Children = new WebJob[] 
            {
                new WebJob() 
                { 
                    Id=2, Name = "Job2", ExecURL = "http://url2", 
                    Children = new WebJob[] 
                    {
                        new WebJob() { Id=4, Name = "Job4", ExecURL = "http://url4" }
                    }
                },
                new WebJob() 
                { 
                    Id=3, Name = "Job3", ExecURL = "http://url3" 
                }
            }
        },
        new WebJob() { Id=5, Name = "Job5", ExecURL = "http://url5"}                
    };
}

This works ok, but I am not convinced if this recursive async pattern is an efficient approach. I was thinking to avoid t.Wait(). I have tried ContinueWith on t which appears no different in my understanding, I also read about ForEachAsync pattern and was wondering if that would be a fit. This solution would eventually be an ASP.NET Web API service. Any thoughts on this recursive async pattern?

Upvotes: 3

Views: 1220

Answers (2)

Douglas
Douglas

Reputation: 54877

If GetDataAsync is the only blocking operation that you have, then you can use asynchronous programming throughout, avoiding the need for Parallel.ForEach calls or blocking Wait calls.

public async Task Process()
{
    WebJob[] jobs = CreateWebJobs(); // dummy jobs

    await Task.WhenAll(jobs.Select(ExecuteJob));
}

private async Task ExecuteJob(WebJob job, [CallerMemberName] string memberName = "")
{
    Console.ForegroundColor = ConsoleColor.DarkYellow;
    Console.WriteLine("Caller> {0} :: {1} Job> {2} :: {3} Thread> {4}", memberName, "\t", job.Name, "\t", Thread.CurrentThread.ManagedThreadId);

    await GetDataAsync(job);

    if (job.Children != null)
    {
        var childTasks = job.Children.Select(r =>
        {
            r.ParentResponse = job.Response;
            return ExecuteJob(r);
        });

        await Task.WhenAll(childTasks);
    }
}

Edit: If the top-level method should block (rather than risk having consumers fire-and-forget), do:

public void Process()
{
    WebJob[] jobs = CreateWebJobs(); // dummy jobs

    Task.WaitAll(jobs.Select(ExecuteJob));
}

Upvotes: 4

Stephen Cleary
Stephen Cleary

Reputation: 456437

Since your core is asynchronous, you shouldn't be using parallel or multithreading at all. What you want is concurrency without parallelism - that is, asynchronous concurrency, usually done with Task.WhenAll.

This is doubly true since you're planning to deploy to ASP.NET, where parallelism can significantly reduce your scalability.

public async Task ProcessAsync()
{
  WebJob[] jobs = CreateWebJobs();

  await Task.WhenAll(jobs.Select(x => ExecuteJobAsync(x)));
}

private async Task ExecuteJobAsync(WebJob job, [CallerMemberName] string memberName = "")
{
  Console.ForegroundColor = ConsoleColor.DarkYellow;
  Console.WriteLine("Caller> {0} :: {1} Job> {2} :: {3} Thread> {4}", memberName, "\t", job.Name, "\t", Thread.CurrentThread.ManagedThreadId);

  await GetDataAsync(job);
  if (job.Children != null)
  {
    var childTasks = job.Children.Select(async x =>
    {
      x.ParentResponse = job.Response; // Children need parent's response
      await ExecuteJobAsync(x);
    });
    await Task.WhenAll(childTasks);
  }
}

Upvotes: 2

Related Questions