Born Under a Bad Sign
Born Under a Bad Sign

Reputation: 61

Parallel invocation of elements of an IEnumerable

I have an IEnumerable<IEnumerable<T>> method called Batch that works like

var list = new List<int>() { 1, 2, 4, 8, 10, -4, 3 }; 
var batches = list.Batch(2); 
foreach(var batch in batches)
    Console.WriteLine(string.Join(",", batch));

-->

1,2
4,8
10,-4
3

The problem I've having is that I'm to optimize something like

foreach(var batch in batches)
    ExecuteBatch(batch);

by

Task[] tasks = batches.Select(batch => Task.Factory.StartNew(() => ExecuteBatch(batch))).ToArray();
Task.WaitAll(tasks);

or

Action[] executions = batches.Select(batch => new Action(() => ExecuteBatch(batch))).ToArray();
var options = new ParallelOptions { MaxDegreeOfParallelism = 4 };
Parallel.Invoke(options, executions);

(because ExecuteBatch is a long-running operation involving IO)

then I notice that each batch gets screwed up, is only 1 element which is default(int). Any idea what's happening or how to fix it?

Batch:

public static IEnumerable<IEnumerable<T>> Batch<T>(this IEnumerable<T> source, int size)
{
    for(var mover = source.GetEnumerator(); ;)
    {
        if(!mover.MoveNext())
            yield break;
        yield return LimitMoves(mover, size);
    }
}
private static IEnumerable<T> LimitMoves<T>(IEnumerator<T> mover, int limit)
{
    do yield return mover.Current;
    while(--limit > 0 && mover.MoveNext());
}

Upvotes: 5

Views: 494

Answers (1)

Rob
Rob

Reputation: 27357

As noted in the comments, your actual issue is your implementation of Batch.

This code:

for(var mover = source.GetEnumerator(); ;)
{
    if(!mover.MoveNext())
        yield break;
    yield return LimitMoves(mover, size);
}

When Batch is materialized, this code is going to continually call MoveNext() until the enumerable is exhausted. LimitMoves() uses the same iterator, and is lazily invoked. Since Batch exhausts the enumerable, LimitMoves() will never emit an item. (Actually, it will only emit default(T) since it always returns mover.Current, which will be default(T) once the enumerable is finished).

Here's an implementation of Batch which will work when materialized (and thus when in parallel).

public static IEnumerable<IEnumerable<T>> Batch<T>(this IEnumerable<T> source, int size)
{
    var mover = source.GetEnumerator();
    var currentSet = new List<T>();
    while (mover.MoveNext())
    {
        currentSet.Add(mover.Current);
        if (currentSet.Count >= size)
        {   
            yield return currentSet;
            currentSet = new List<T>();
        }
    }
    if (currentSet.Count > 0)
        yield return currentSet;
}

Alternatively, you could use MoreLINQ - which comes with a Batch implementation. You can see their implementation here

Upvotes: 4

Related Questions