Reputation: 43563
I have a producer-consumer scenario where the producer is an enumerable sequence of items (IEnumerable<Item>
). I want to process these items in chunks/batches of 10 items each. So I decided to use the new (.NET 6) Chunk
LINQ operator, as suggested in this question: Create batches in LINQ.
My problem is that sometimes the producer fails, and in this case the consumer of the chunkified sequence receives the error without first receiving a chunk with the last items that were produced before the error. So if for example the producer generates 15 items and then fails, the consumer will get a chunk with the items 1-10 and then will get an exception. The items 11-15 will be lost! Here is a minimal example that demonstrates this undesirable behavior:
static IEnumerable<int> Produce()
{
int i = 0;
while (true)
{
i++;
Console.WriteLine($"Producing #{i}");
yield return i;
if (i == 15) throw new Exception("Oops!");
}
}
// Consume
foreach (int[] chunk in Produce().Chunk(10))
{
Console.WriteLine($"Consumed: [{String.Join(", ", chunk)}]");
}
Output:
Producing #1
Producing #2
Producing #3
Producing #4
Producing #5
Producing #6
Producing #7
Producing #8
Producing #9
Producing #10
Consumed: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
Producing #11
Producing #12
Producing #13
Producing #14
Producing #15
Unhandled exception. System.Exception: Oops!
at Program.<Main>g__Produce|0_0()+MoveNext()
at System.Linq.Enumerable.ChunkIterator[TSource](IEnumerable`1 source, Int32 size)+MoveNext()
at Program.Main()
The desirable behavior would be to get a chunk with the values [11, 12, 13, 14, 15]
before getting the exception.
My question is: Is there any way to configure the Chunk
operator so that it prioritizes emitting data instead of exceptions? If not, how can I implement a custom LINQ operator, named for example ChunkNonDestructive
, with the desirable behavior?
public static IEnumerable<TSource[]> ChunkNonDestructive<TSource>(
this IEnumerable<TSource> source, int size);
Note: Except from the System.Linq.Chunk
operator I also experimented with the Buffer
operator from the System.Interactive package, as well as the Batch
operator from the MoreLinq package. Apparently they all behave the same (destructively).
Update: Here is the desirable output of the above example:
Producing #1
Producing #2
Producing #3
Producing #4
Producing #5
Producing #6
Producing #7
Producing #8
Producing #9
Producing #10
Consumed: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
Producing #11
Producing #12
Producing #13
Producing #14
Producing #15
Consumed: [11, 12, 13, 14, 15]
Unhandled exception. System.Exception: Oops!
at Program.<Main>g__Produce|0_0()+MoveNext()
at System.Linq.Enumerable.ChunkIterator[TSource](IEnumerable`1 source, Int32 size)+MoveNext()
at Program.Main()
The difference is the line Consumed: [11, 12, 13, 14, 15]
, that is not present in the actual output.
Upvotes: 4
Views: 1606
Reputation: 43563
I was inspired by StriplingWarrior's answer, which is based on an idea that I didn't initially understand. The idea is to reuse the existing Chunk
implementation, and propagate the exception around it instead of through it. Based on this idea I wrote a generic method DeferErrorUntilCompletion
that robustifies¹ all kinds of LINQ operators, or combinations of operators, according to this rule:
In case the input
sequence fails, the error is propagated after yielding all the elements of the output
sequence.
private static IEnumerable<TOutput> DeferErrorUntilCompletion<TInput, TOutput>(
IEnumerable<TInput> input,
Func<IEnumerable<TInput>, IEnumerable<TOutput>> conversion)
{
Task errorContainer = null;
IEnumerable<TInput> InputIterator()
{
using var enumerator = input.GetEnumerator();
while (true)
{
TInput item;
try
{
if (!enumerator.MoveNext()) break;
item = enumerator.Current;
}
catch (Exception ex)
{
errorContainer = Task.FromException(ex);
break;
}
yield return item;
}
}
IEnumerable<TOutput> output = conversion(InputIterator());
foreach (TOutput item in output) yield return item;
errorContainer?.GetAwaiter().GetResult();
}
Then I used the DeferErrorUntilCompletion
method to implement the ChunkNonDestructive
operator like this:
/// <summary>
/// Splits the elements of a sequence into chunks of the specified size.
/// In case the sequence fails and there are buffered elements, a last chunk
/// that contains these elements is emited before propagating the error.
/// </summary>
public static IEnumerable<TSource[]> ChunkNonDestructive<TSource>(
this IEnumerable<TSource> source, int size)
{
ArgumentNullException.ThrowIfNull(source);
if (size < 1) throw new ArgumentOutOfRangeException(nameof(size));
return DeferErrorUntilCompletion(source, s => s.Chunk(size));
}
The implementation uses a Task
for capturing the error, which is later rethrown without losing the original stack trace.
Deferring the propagation of the error opens an interesting possibility: The consumer of the deferred sequence might abandon the enumeration prematurely, for example by break
ing or by suffering an exception, while an error is already captured inside the errorContainer
. This possibility is handled by propagating the unobserved error through the TaskScheduler.UnobservedTaskException
event. Other options for handing this scenario could be to rethrow the error during the Dispose
of the deferred enumerator, or simply suppress the error. Suppressing was implemented in the 3rd revision of this answer, by using a ExceptionDispatchInfo
instead of a Task
as container for the error. Throwing on Dispose
has other problems that are discussed in this question.
Although there is some value at reusing an existing built-in implementation (simplicity, consistency, robustness), there are downsides too. Adding two extra enumerations on top of the core functionality could result in non-negligible overhead. Charlieface's implementation is about twice as fast as this implementation, at producing chunks. So for a producer-consumer scenario with very high throughput (thousands of chunks per second), I would probably prefer to use Charlieface's implementation than this one.
¹ The idea that the LINQ operators need to be robustified might sound strange, or even arrogant. Please note that the context of this answer is very specific: it is producer-consumer scenarios. In these scenarios, where multiple producers and consumers might be running in parallel, occasional exceptions are to be expected, and resilience mechanisms are in place in anticipation of such exceptions, losing messages here and there because of errors is generally something to avoid.
Upvotes: 2
Reputation: 156554
If you preprocess your source to make it stop when it encounters an exception, then you can use Chunk()
as-is.
public static class Extensions
{
public static IEnumerable<T> UntilFirstException<T>(this IEnumerable<T> source, Action<Exception> exceptionCallback = null)
{
using var enumerator = source.GetEnumerator();
while (true)
{
T current;
try
{
if (!enumerator.MoveNext())
{
break;
}
current = enumerator.Current;
}
catch (Exception e)
{
exceptionCallback?.Invoke(e);
break;
}
yield return current;
}
}
}
Exception? e = null;
foreach (int[] chunk in Produce().UntilFirstException(thrown => e = thrown).Chunk(10))
{
Console.WriteLine($"Consumed: [{String.Join(", ", chunk)}]");
}
I feel like that keeps responsibilities separated nicely. If you want a helper that throws an exception instead of having to capture it yourself, you can use this as a component to simplify writing that helper:
public static IEnumerable<T[]> ChunkUntilFirstException<T>(this IEnumerable<T> source, int size)
{
Exception? e = null;
var result = source.UntilFirstException(thrown => e = thrown).Chunk(size);
foreach (var element in result)
{
yield return element;
}
if (e != null)
{
throw new InvalidOperationException("source threw an exception", e);
}
}
Note that this will throw a different exception than the one emitted by the producer. This lets you keep the stack trace associated with the original exception, whereas throw e
would overwrite that stack trace.
You can tweak this according to your needs. If you need to catch a specific type of exception that you're expecting your producer to emit, it's easy enough to use the when
contextual keyword with some pattern matching.
try
{
foreach (int[] chunk in Produce().ChunkUntilFirstException(10))
{
Console.WriteLine($"Consumed: [{String.Join(", ", chunk)}]");
}
}
catch (InvalidOperationException e) when (e.InnerException is {Message: "Oops!"})
{
Console.WriteLine(e.InnerException.ToString());
}
Upvotes: 2
Reputation: 71805
You can't catch an exception, yield and then re-throw, because you can't have yield
inside a catch
. (For obvious reasons: once you have yielded then you are not in a catch
any more.)
I think the only solution that will preserve the original exception with the original stack trace is to use ExceptionDispatchInfo.Capture
.
private static IEnumerable<IList<TSource>> ChunkIterator<TSource>(this IEnumerable<TSource> source, int size)
{
using var e = source.GetEnumerator();
var chunk = new List<TSource>(size);
ExceptionDispatchInfo exDispatch = null;
try
{
while(true)
{
try
{
while(e.MoveNext())
{
chunk.Add(e.Current);
if (chunk.Count == size)
break;
}
}
catch(Exception ex)
{
exDispatch = ExceptionDispatchInfo.Capture(ex);
}
if(chunk.Count > 0)
yield return chunk.ToArray();
var exDispatch2 = exDispatch;
exDispatch = null;
exDispatch2?.Throw();
if(chunk.Count > 0)
chunk.Clear();
else
yield break;
}
}
finally
{
exDispatch?.Throw();
}
}
Your foreach
will always receive the last chunk of items, and only throw on the next iteration.
Upvotes: 1
Reputation: 67386
First off, a matter of semantics. There's nothing destructive in Chunk
or Buffer
or anything else, it just reads items from a source enumerable until it's over or it throws an exception. The only destructive thing in your code is you throwing exceptions, which behaves as expected (ie, unwinds the stack out of your generator, out of the Linq functions and into a catch in your code, if any exists).
Also it should be immediately obvious that every Linq functions will behave the same in regards to exceptions. It's in fact how exceptions work, and working around them to support your use case is relatively expensive: you'll need to swallow exceptions for every item you generate. This, in my humble opinion, is incredibly bad design, and you'd be fired on the spot if you worked for me and did that.
With all that out of the way, writing a BadDesignChunk
like that is trivial (if expensive):
public static IEnumerable<IEnumerable<TSource>> BadDesignChunk<TSource>(this IEnumerable<TSource> source, int size)
{
Exception caughtException = default;
var chunk = new List<TSource>();
using var enumerator = source.GetEnumerator();
while(true)
{
while(chunk.Count < size)
{
try
{
if(!enumerator.MoveNext())
{
// end of the stream, send what we have and finish
goto end;
}
}
catch(Exception ex)
{
// exception, send what we have and finish
caughtException = ex;
goto end;
}
chunk.Add(enumerator.Current);
}
// chunk full, send it
yield return chunk;
chunk.Clear();
}
end:
if(chunk.Count > 0)
yield return chunk;
if(caughtException is not null)
throw caughtException;
}
Upvotes: 2