Theodor Zoulias
Theodor Zoulias

Reputation: 43563

How to chunkify an IEnumerable<T>, without losing/discarding items in case of failure?

I have a producer-consumer scenario where the producer is an enumerable sequence of items (IEnumerable<Item>). I want to process these items in chunks/batches of 10 items each. So I decided to use the new (.NET 6) Chunk LINQ operator, as suggested in this question: Create batches in LINQ.

My problem is that sometimes the producer fails, and in this case the consumer of the chunkified sequence receives the error without first receiving a chunk with the last items that were produced before the error. So if for example the producer generates 15 items and then fails, the consumer will get a chunk with the items 1-10 and then will get an exception. The items 11-15 will be lost! Here is a minimal example that demonstrates this undesirable behavior:

static IEnumerable<int> Produce()
{
    int i = 0;
    while (true)
    {
        i++;
        Console.WriteLine($"Producing #{i}");
        yield return i;
        if (i == 15) throw new Exception("Oops!");
    }
}

// Consume
foreach (int[] chunk in Produce().Chunk(10))
{
    Console.WriteLine($"Consumed: [{String.Join(", ", chunk)}]");
}

Output:

Producing #1
Producing #2
Producing #3
Producing #4
Producing #5
Producing #6
Producing #7
Producing #8
Producing #9
Producing #10
Consumed: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
Producing #11
Producing #12
Producing #13
Producing #14
Producing #15
Unhandled exception. System.Exception: Oops!
   at Program.<Main>g__Produce|0_0()+MoveNext()
   at System.Linq.Enumerable.ChunkIterator[TSource](IEnumerable`1 source, Int32 size)+MoveNext()
   at Program.Main()

Online demo.

The desirable behavior would be to get a chunk with the values [11, 12, 13, 14, 15] before getting the exception.

My question is: Is there any way to configure the Chunk operator so that it prioritizes emitting data instead of exceptions? If not, how can I implement a custom LINQ operator, named for example ChunkNonDestructive, with the desirable behavior?

public static IEnumerable<TSource[]> ChunkNonDestructive<TSource>(
    this IEnumerable<TSource> source, int size);

Note: Except from the System.Linq.Chunk operator I also experimented with the Buffer operator from the System.Interactive package, as well as the Batch operator from the MoreLinq package. Apparently they all behave the same (destructively).


Update: Here is the desirable output of the above example:

Producing #1
Producing #2
Producing #3
Producing #4
Producing #5
Producing #6
Producing #7
Producing #8
Producing #9
Producing #10
Consumed: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
Producing #11
Producing #12
Producing #13
Producing #14
Producing #15
Consumed: [11, 12, 13, 14, 15]
Unhandled exception. System.Exception: Oops!
   at Program.<Main>g__Produce|0_0()+MoveNext()
   at System.Linq.Enumerable.ChunkIterator[TSource](IEnumerable`1 source, Int32 size)+MoveNext()
   at Program.Main()

The difference is the line Consumed: [11, 12, 13, 14, 15], that is not present in the actual output.

Upvotes: 4

Views: 1606

Answers (4)

Theodor Zoulias
Theodor Zoulias

Reputation: 43563

I was inspired by StriplingWarrior's answer, which is based on an idea that I didn't initially understand. The idea is to reuse the existing Chunk implementation, and propagate the exception around it instead of through it. Based on this idea I wrote a generic method DeferErrorUntilCompletion that robustifies¹ all kinds of LINQ operators, or combinations of operators, according to this rule:

In case the input sequence fails, the error is propagated after yielding all the elements of the output sequence.

private static IEnumerable<TOutput> DeferErrorUntilCompletion<TInput, TOutput>(
    IEnumerable<TInput> input,
    Func<IEnumerable<TInput>, IEnumerable<TOutput>> conversion)
{
    Task errorContainer = null;
    IEnumerable<TInput> InputIterator()
    {
        using var enumerator = input.GetEnumerator();
        while (true)
        {
            TInput item;
            try
            {
                if (!enumerator.MoveNext()) break;
                item = enumerator.Current;
            }
            catch (Exception ex)
            {
                errorContainer = Task.FromException(ex);
                break;
            }
            yield return item;
        }
    }
    IEnumerable<TOutput> output = conversion(InputIterator());
    foreach (TOutput item in output) yield return item;
    errorContainer?.GetAwaiter().GetResult();
}

Then I used the DeferErrorUntilCompletion method to implement the ChunkNonDestructive operator like this:

/// <summary>
/// Splits the elements of a sequence into chunks of the specified size.
/// In case the sequence fails and there are buffered elements, a last chunk
/// that contains these elements is emited before propagating the error.
/// </summary>
public static IEnumerable<TSource[]> ChunkNonDestructive<TSource>(
    this IEnumerable<TSource> source, int size)
{
    ArgumentNullException.ThrowIfNull(source);
    if (size < 1) throw new ArgumentOutOfRangeException(nameof(size));
    return DeferErrorUntilCompletion(source, s => s.Chunk(size));
}

Online example.

The implementation uses a Task for capturing the error, which is later rethrown without losing the original stack trace.

Deferring the propagation of the error opens an interesting possibility: The consumer of the deferred sequence might abandon the enumeration prematurely, for example by breaking or by suffering an exception, while an error is already captured inside the errorContainer. This possibility is handled by propagating the unobserved error through the TaskScheduler.UnobservedTaskException event. Other options for handing this scenario could be to rethrow the error during the Dispose of the deferred enumerator, or simply suppress the error. Suppressing was implemented in the 3rd revision of this answer, by using a ExceptionDispatchInfo instead of a Task as container for the error. Throwing on Dispose has other problems that are discussed in this question.

Although there is some value at reusing an existing built-in implementation (simplicity, consistency, robustness), there are downsides too. Adding two extra enumerations on top of the core functionality could result in non-negligible overhead. Charlieface's implementation is about twice as fast as this implementation, at producing chunks. So for a producer-consumer scenario with very high throughput (thousands of chunks per second), I would probably prefer to use Charlieface's implementation than this one.

¹ The idea that the LINQ operators need to be robustified might sound strange, or even arrogant. Please note that the context of this answer is very specific: it is producer-consumer scenarios. In these scenarios, where multiple producers and consumers might be running in parallel, occasional exceptions are to be expected, and resilience mechanisms are in place in anticipation of such exceptions, losing messages here and there because of errors is generally something to avoid.

Upvotes: 2

StriplingWarrior
StriplingWarrior

Reputation: 156554

If you preprocess your source to make it stop when it encounters an exception, then you can use Chunk() as-is.

public static class Extensions
{
    public static IEnumerable<T> UntilFirstException<T>(this IEnumerable<T> source, Action<Exception> exceptionCallback = null)
    {
        using var enumerator = source.GetEnumerator();
        while (true)
        {
            T current;
            try
            {
                if (!enumerator.MoveNext())
                {
                    break;
                }
                current = enumerator.Current;
            }
            catch (Exception e)
            {
                exceptionCallback?.Invoke(e);
                break;
            }
            yield return current;
        }
    }
}
    Exception? e = null;
    foreach (int[] chunk in Produce().UntilFirstException(thrown => e = thrown).Chunk(10))
    {
        Console.WriteLine($"Consumed: [{String.Join(", ", chunk)}]");
    }

I feel like that keeps responsibilities separated nicely. If you want a helper that throws an exception instead of having to capture it yourself, you can use this as a component to simplify writing that helper:

    public static IEnumerable<T[]> ChunkUntilFirstException<T>(this IEnumerable<T> source, int size)
    {
        Exception? e = null;
        var result = source.UntilFirstException(thrown => e = thrown).Chunk(size);
        foreach (var element in result)
        {
            yield return element;
        }
        if (e != null)
        {
            throw new InvalidOperationException("source threw an exception", e);
        }
    }

Note that this will throw a different exception than the one emitted by the producer. This lets you keep the stack trace associated with the original exception, whereas throw e would overwrite that stack trace.

You can tweak this according to your needs. If you need to catch a specific type of exception that you're expecting your producer to emit, it's easy enough to use the when contextual keyword with some pattern matching.

    try
    {
        foreach (int[] chunk in Produce().ChunkUntilFirstException(10))
        {
            Console.WriteLine($"Consumed: [{String.Join(", ", chunk)}]");
        }
    }
    catch (InvalidOperationException e) when (e.InnerException is {Message: "Oops!"})
    {
        Console.WriteLine(e.InnerException.ToString());
    }

Upvotes: 2

Charlieface
Charlieface

Reputation: 71805

You can't catch an exception, yield and then re-throw, because you can't have yield inside a catch. (For obvious reasons: once you have yielded then you are not in a catch any more.)

I think the only solution that will preserve the original exception with the original stack trace is to use ExceptionDispatchInfo.Capture.

private static IEnumerable<IList<TSource>> ChunkIterator<TSource>(this IEnumerable<TSource> source, int size)
{
    using var e = source.GetEnumerator();

    var chunk = new List<TSource>(size);
    ExceptionDispatchInfo exDispatch = null;
    try
    {
        while(true)
        {
            try
            {
                while(e.MoveNext())
                {
                    chunk.Add(e.Current);
                    if (chunk.Count == size)
                        break;
                }
            }
            catch(Exception ex)
            {
                exDispatch = ExceptionDispatchInfo.Capture(ex);
            }

            if(chunk.Count > 0)
                yield return chunk.ToArray();

            var exDispatch2 = exDispatch;
            exDispatch = null;
            exDispatch2?.Throw();

            if(chunk.Count > 0)
                chunk.Clear();
            else
                yield break;
        }
    }
    finally
    {
        exDispatch?.Throw();
    }
}

Your foreach will always receive the last chunk of items, and only throw on the next iteration.

Upvotes: 1

Blindy
Blindy

Reputation: 67386

First off, a matter of semantics. There's nothing destructive in Chunk or Buffer or anything else, it just reads items from a source enumerable until it's over or it throws an exception. The only destructive thing in your code is you throwing exceptions, which behaves as expected (ie, unwinds the stack out of your generator, out of the Linq functions and into a catch in your code, if any exists).

Also it should be immediately obvious that every Linq functions will behave the same in regards to exceptions. It's in fact how exceptions work, and working around them to support your use case is relatively expensive: you'll need to swallow exceptions for every item you generate. This, in my humble opinion, is incredibly bad design, and you'd be fired on the spot if you worked for me and did that.

With all that out of the way, writing a BadDesignChunk like that is trivial (if expensive):

public static IEnumerable<IEnumerable<TSource>> BadDesignChunk<TSource>(this IEnumerable<TSource> source, int size)
{
    Exception caughtException = default;
    var chunk = new List<TSource>();
    using var enumerator = source.GetEnumerator();
    
    while(true)
    {
        while(chunk.Count < size)
        {
            try
            {
                if(!enumerator.MoveNext())
                {
                    // end of the stream, send what we have and finish
                    goto end;
                }
            }
            catch(Exception ex)
            {
                // exception, send what we have and finish
                caughtException = ex;
                goto end;
            }
            
            chunk.Add(enumerator.Current);
        }
        
        // chunk full, send it
        yield return chunk;
        chunk.Clear();
    }
    
    end:
    if(chunk.Count > 0)
        yield return chunk;
    if(caughtException is not null)
        throw caughtException;
}

See it in action here.

Upvotes: 2

Related Questions