Reputation: 674
I'm having trouble implementing the following properly:
I have two IAsyncEnumerable
instances (of the same type), one is the "primary" source, the purpose of the other is so the user can "insert" items on demand.
So if the primary source ends, the secondary also needs to stop, ending the enumeration.
The caller of the Merge function also needs to be able to react when the primary source ends (i.e. cleanup the secondary enumerable).
Given this I came up with the following implementation:
static async IAsyncEnumerable<object?> Merge(IAsyncEnumerable<object?> primary, IAsyncEnumerable<object?> secondary, Action primaryFinished, [EnumeratorCancellation] CancellationToken ct) {
var pIt = primary.GetAsyncEnumerator(ct);
var sIt = secondary.GetAsyncEnumerator(ct);
await using var _pIt = pIt.ConfigureAwait(false);
await using var _sIt = sIt.ConfigureAwait(false);
Task<bool>? pItTask = null;
Task<bool>? sItTask = null;
IAsyncEnumerator<object?>? it = null;
while (true) {
Task<bool> task;
try {
pItTask ??= pIt.MoveNextAsync().AsTask();
sItTask ??= sIt.MoveNextAsync().AsTask();
task = await Task.WhenAny(pItTask, sItTask).ConfigureAwait(false);
if (pItTask == task) {
pItTask = null;
it = pIt;
if (!task.Result) {
primaryFinished();
yield break;
}
} else {
sItTask = null;
it = sIt;
}
} catch (Exception) {
primaryFinished();
yield break;
}
if (task.Result) yield return it.Current;
}
}
The problem is that this will throw a NotSupportedException
if the primary source throws an exception, which messes up my exception handling while going up the stack.
I found a github issue which mentions the problem but doesn't really solve it (I'm getting the exception even with the delay).
Full Code to reproduce the issue:
https://dotnetfiddle.net/v7SgEV
//Needs the "System.Interactive.Async" nuget package
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Runtime.CompilerServices;
using System.Threading.Channels;
while (true) {
try {
using var cts = new CancellationTokenSource();
var secondaryDataSource = Channel.CreateUnbounded<object?>();
var reader = PrimaryDataSource(cts.Token).Do(x => {
Console.WriteLine("---");
});
var combinedSource = Merge(
reader,
secondaryDataSource.Reader.ReadAllAsync(cts.Token),
() => secondaryDataSource.Writer.TryComplete(),
cts.Token
);
await foreach (var item in combinedSource) {
Console.WriteLine(item ?? "<Null>");
throw new Exception();
}
} catch (NotSupportedException ex) {
throw;
} catch (Exception) {
}
}
static async IAsyncEnumerable<object?> PrimaryDataSource([EnumeratorCancellation] CancellationToken ct) {
while (true) {
await Task.Delay(Random.Shared.Next(100), ct).ConfigureAwait(false);
yield return "PrimaryDataSource";
throw new Exception();
yield break;
}
}
static async IAsyncEnumerable<object?> Merge(IAsyncEnumerable<object?> primary, IAsyncEnumerable<object?> secondary, Action primaryFinished, [EnumeratorCancellation] CancellationToken ct) {
var pIt = primary.GetAsyncEnumerator(ct);
var sIt = secondary.GetAsyncEnumerator(ct);
await using var _pIt = pIt.ConfigureAwait(false);
await using var _sIt = sIt.ConfigureAwait(false);
Task<bool>? pItTask = null;
Task<bool>? sItTask = null;
IAsyncEnumerator<object?>? it = null;
while (true) {
Task<bool> task;
try {
pItTask ??= pIt.MoveNextAsync().AsTask();
sItTask ??= sIt.MoveNextAsync().AsTask();
task = await Task.WhenAny(pItTask, sItTask).ConfigureAwait(false);
if (pItTask == task) {
pItTask = null;
it = pIt;
if (!task.Result) {
primaryFinished();
yield break;
}
} else {
sItTask = null;
it = sIt;
}
} catch (Exception) {
primaryFinished();
if (pItTask != null) await pItTask;
if (sItTask != null) await sItTask;
yield break;
}
if (task.Result) yield return it.Current;
}
}
Upvotes: 1
Views: 81
Reputation: 674
Going by the comments by TheodorZoulias/shingo I found the Problem and fixed it:
private static async IAsyncEnumerable<object?> Merge(IAsyncEnumerable<object?> primary, IAsyncEnumerable<IList<object?>> secondary, Action primaryFinished, [EnumeratorCancellation] CancellationToken ct) {
var pIt = primary.GetAsyncEnumerator(ct);
var sIt = secondary.GetAsyncEnumerator(ct);
var _pIt = pIt.CAf();
var _sIt = sIt.CAf();
Task<bool>? pItTask = null;
Task<bool>? sItTask = null;
try {
IAsyncEnumerator<object?>? it = null;
while(true) {
Task<bool> task;
pItTask ??= pIt.MoveNextAsync(ct).AsTask();
sItTask ??= sIt.MoveNextAsync(ct).AsTask();
task = await Task.WhenAny(pItTask, sItTask).CAf();
if(pItTask == task) {
pItTask = null;
it = pIt;
if(!task.Result) {
primaryFinished();
yield break;
}
} else {
sItTask = null;
it = sIt;
}
if(task.Result) {
yield return it.Current;
}
}
} finally {
try { await _pIt.DisposeAsync(); } catch { }
try { await _sIt.DisposeAsync(); } catch { }
}
}
The important parts are the try/catches around the DisposeAsync
s since they might throw.
Upvotes: 0