mnj
mnj

Reputation: 3413

Async LINQ - not lazy? Multithreaded?

I have the following code:

var things = await GetDataFromApi(cancellationToken);

var builder = new StringBuilder(JsonSerializer.Serialize(things));

await things
    .GroupBy(x => x.Category)
    .ToAsyncEnumerable()
    .SelectManyAwaitWithCancellation(async (category, ct) =>
    {
        var thingsWithColors = await _colorsApiClient.GetColorsFor(category.Select(thing => thing.Name).ToList(), ct);
        
        return category
            .Select(thing => ChooseBestColor(thingsWithColors))
            .ToAsyncEnumerable();
    })
    .ForEachAsync(thingAndColor =>
    {
        Console.WriteLine(Thread.CurrentThread.ManagedThreadId); // prints different IDs
        builder.Replace(thingAndColor.Thing, $"{thingAndColor.Color} {thingAndColor.Thing}");
    }, cancellationToken);

It uses System.Linq.Async and I find it difficult to understand. In "classic"/synchronous LINQ, the whole thing would get executed only when I call ToList() or ToArray() on it. In the example above, there is no such call, but the lambdas get executed anyway. How does it work?

The other concern I have is about multi-threading. I heard many times that async != multithreading. Then, how is that possible that the Console.WriteLine(Thread.CurrentThread.ManagedThreadId); prints various IDs? Some of the IDs get printed multiple times, but overall there are about 5 thread IDs in the output. None of my code creates any threads explicitly. It's all async-await. The StringBuilder does not support multi-threading, and I'd like to understand if the implementation above is valid.

Please ignore the algorithm of my code, it does not really matter, it's just an example. What matters is the usage of System.Async.Linq.

Upvotes: 3

Views: 509

Answers (2)

Theodor Zoulias
Theodor Zoulias

Reputation: 43484

The System.Async.Linq, as well as the whole dotnet/reactive repository, is currently a semi-abandoned project. The issues on GitHub are piling up, and nobody answers them officially for almost a year. There is no documentation published, apart from the XML documentation in the source code on top of each method. You can't really use this library without studying the source code, which is generally easy to do because the code is short, readable, and honestly doesn't do too much. The functionality offered by this library is similar with the functionality found in the System.Linq, with the main difference being that the input is IAsyncEnumerable<T> instead of IEnumerable<T>, and the delegates can return values wrapped in ValueTask<T>s.

With the exception of a few operators like the Merge (and only one of its overloads), the System.Async.Linq doesn't introduce concurrency. The asynchronous operations are invoked one at a time, and then they are awaited before invoking the next operation. The SelectManyAwaitWithCancellation operator is not one of the exceptions. The selector is invoked sequentially for each element, and the resulting IAsyncEnumerable<TResult> is enumerated sequentially, and its values yielded the one after the other. So it's unlikely to create thread-safety issues.

The ForEachAsync operator is just a substitute of doing a standard await foreach loop, and was included in the library at a time when the C# language support for await foreach was non existent (before C# 8). I would recommend against using this operator, because its resemblance with the new Parallel.ForEachAsync API could create confusion. Here is what is written inside the source code of the ForEachAsync operator:

// REVIEW: Once we have C# 8.0 language support, we may want to do away with these
//         methods. An open question is how to provide support for cancellation,
//         which could be offered through WithCancellation on the source. If we still
//         want to keep these methods, they may be a candidate for
//         System.Interactive.Async if we consider them to be non-standard
//         (i.e. IEnumerable<T> doesn't have a ForEach extension method either).

Upvotes: 3

JonasH
JonasH

Reputation: 36341

ForEachAsync would have a similar effect as ToList/ToArray since it forces evaluation of the entire list.

By default, anything after an await continues on the same execution context, meaning if the code runs on the UI thread, it will continue running on the UI thread. If it runs on a background thread, it will continue to run on a background thread, but not necessarily the same one.

However, none of your code should run in parallel. That does not necessarily mean it is thread safe, there probably need to be some memory barriers to ensure data is flushed correctly, but I would assume these barriers are issued by the framework code itself.

Upvotes: 5

Related Questions