user7127000
user7127000

Reputation: 3233

Is there a way to combine LINQ and async

Basically I have a procedure like

var results = await Task.WhenAll(
    from input in inputs
    select Task.Run(async () => await InnerMethodAsync(input))
);
.
.
.
private static async Task<Output> InnerMethodAsync(Input input)
{
    var x = await Foo(input);
    var y = await Bar(x);
    var z = await Baz(y);
    return z;
}

and I'm wondering whether there's a fancy way to combine this into a single LINQ query that's like an "async stream" (best way I can describe it).

Upvotes: 6

Views: 12954

Answers (3)

Enigmativity
Enigmativity

Reputation: 117037

Try using Microsoft's Reactive Framework. Then you can do this:

IObservable<Output[]> query =
    from input in inputs.ToObservable()
    from x in Observable.FromAsync(() => Foo(input))
    from y in Observable.FromAsync(() => Bar(x))
    from z in Observable.FromAsync(() => Baz(y))
    select z;

Output[] results = await query.ToArray();

Simple.

Just NuGet "System.Reactive" and add using System.Reactive.Linq; to your code.

Upvotes: 2

John Wu
John Wu

Reputation: 52240

When you use LINQ, there are generally two parts to it: creation and iteration.

Creation:

var query = list.Select( a => a.Name);

These calls are always synchronous. But this code doesn't do much more than create an object that exposes an IEnumerable. The actual work isn't done till later, due to a pattern called deferred execution.

Iteration:

var results = query.ToList();

This code takes the enumerable and gets the value of each item, which typically will involve the invocation of your callback delegates (in this case, a => a.Name ). This is the part that is potentially expensive, and could benefit from asychronousness, e.g. if your callback is something like async a => await httpClient.GetByteArrayAsync(a).

So it's the iteration part that we're interested in, if we want to make it async.

The issue here is that ToList() (and most of the other methods that force iteration, like Any() or Last()) are not asynchronous methods, so your callback delegate will be invoked synchronously, and you’ll end up with a list of tasks instead of the data you want.

We can get around that with a piece of code like this:

public static class ExtensionMethods
{
    static public async Task<List<T>> ToListAsync<T>(this IEnumerable<Task<T>> This)
    {
        var tasks = This.ToList();     //Force LINQ to iterate and create all the tasks. Tasks always start when created.
        var results = new List<T>();   //Create a list to hold the results (not the tasks)
        foreach (var item in tasks)
        {
            results.Add(await item);   //Await the result for each task and add to results list
        }
        return results;
    }
}

With this extension method, we can rewrite your code:

var results = await inputs.Select( async i => await InnerMethodAsync(i) ).ToListAsync();

^That should give you the async behavior you're looking for, and avoids creating thread pool tasks, as your example does.

Note: If you are using LINQ-to-entities, the expensive part (the data retrieval) isn't exposed to you. For LINQ-to-entities, you'd want to use the ToListAsync() that comes with the EF framework instead.

Try it out and see the timings in my demo on DotNetFiddle.

Upvotes: 9

StuartLC
StuartLC

Reputation: 107247

A rather obvious answer, but you have just used LINQ and async together - you're using LINQ's select to project, and start, a bunch of async Tasks, and then await on the results, which provides an asynchronous parallelism pattern.

Although you've likely just provided a sample, there are a couple of things to note in your code (I've switched to Lambda syntax, but the same principals apply)

  • Since there's basically zero CPU bound work on each Task before the first await (i.e. no work done before var x = await Foo(input);), there's no real reason to use Task.Run here.
  • And since there's no work to be done in the lambda after call to InnerMethodAsync, you don't need to wrap the InnerMethodAsync calls in an async lambda (but be wary of IDisposable)

i.e. You can just select the Task returned from InnerMethodAsync and await these with Task.WhenAll.

var tasks = inputs
    .Select(input => InnerMethodAsync(input)) // or just .Select(InnerMethodAsync);

var results = await Task.WhenAll(tasks);

More complex patterns are possible with asynchronony and Linq, but rather than reinventing the wheel, you should have a look at Reactive Extensions, and the TPL Data Flow Library, which have many building blocks for complex flows.

Upvotes: 3

Related Questions