Gun
Gun

Reputation: 521

ArgumentOutOfRangeException on a ForEachAsync using Partitioner

To set the context, the application is running on a server where +-20 other apps are doing some multithreading but this process is only for 2 apps on the server. I never had this kind of error on other apps and it all uses the ForEachAsync method. On this particular app, I had to add some multithreading and I sometimes get this error when I use the ForEachAsync :

System.ArgumentOutOfRangeException: Specified argument was out of the range of valid values.
Parameter name: partitionCount
   at System.Collections.Concurrent.Partitioner.DynamicPartitionerForIEnumerable`1.GetOrderablePartitions(Int32 partitionCount)
   at System.Collections.Concurrent.OrderablePartitioner`1.GetPartitions(Int32 partitionCount)
   at Common.AsyncHelper.ForEachAsync[T](IEnumerable`1 source, Func`2 taskSelector, Int32 maxParallelism) in ...\AsyncHelper.cs:line 15

Here is the method :

public static Task ForEachAsync<T>(this IEnumerable<T> source, Func<T, Task> taskSelector, int maxParallelism)
{
    return Task.WhenAll(
        from partition in Partitioner.Create(source).GetPartitions(maxParallelism)
        select Task.Run(async delegate {
            using (partition)
                while (partition.MoveNext())
                    await taskSelector(partition.Current);
        }));
}

And here is how I use it :

int parallel = list.Count() < 8 ? list.Count() : 8;

await list.ForEachAsync(async a => await Process(param1, param2),parallel);

Do I use to much parallelism? edit : Looks like the empty list was the issue.

Here is a minimal, working example :

Here is my AsyncHelper

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace Common
{
    public static class AsyncHelper
    {
        public static Task ForEachAsync<T>(this IEnumerable<T> source, Func<T, Task> taskSelector, int maxParallelism)
        {
            return Task.WhenAll(
                from partition in Partitioner.Create(source).GetPartitions(maxParallelism)
                select Task.Run(async delegate {
                    using (partition)
                        while (partition.MoveNext())
                            await taskSelector(partition.Current);
                }));
        }        
    }
}

And what I want to do is process a list and set the result in another list with max // = 8:

var temp = new ConcurrentBag<TempResponse>();
int parallel = 1;
if(someList.Any(c => c.Valid))
    parallel = someList.Count(c => c.Valid) < 8 ? someList.Count(c => c.Valid)  : 8;

await someList.ForEachAsync(async a => temp.Add(await Process(a.Condition, a.State, a.Name)),parallel);

Upvotes: 2

Views: 604

Answers (1)

Evk
Evk

Reputation: 101493

This error just means you are passing 0 as maxParallelism to your function.

int parallel = list.Count() < 8 ? list.Count() : 8;

Can be zero if target list is empty, and zero is invalid value for Partitioner.Create(...).GetPartitions() call.

So, just check if list is empty and if it is - do nothing (no reason to call ForEachAsync on it).

Upvotes: 2

Related Questions