Reputation: 1211
I'm new to System.Threading.Channels
. I have the following consumer code:
await foreach (var thing in this.Reader.ReadAllAsync(cancellationToken)
.ConfigureAwait(false))
{
await this.HandleThingAsync(thing, cancellationToken).ConfigureAwait(false);
}
That seems to work fine when consuming things produced by a single producer like this:
var things = await this.GetThingsAsync(cancellationToken).ConfigureAwait(false);
await foreach (var thing in things.WithCancellation(cancellationToken)
.ConfigureAwait(false))
{
await this.Writer.WriteAsync(thing, cancellationToken).ConfigureAwait(false);
}
this.Writer.Complete();
But when I try to add in a second producer of the same general form, as soon as one of the two producers is done (and calls this.Writer.Complete()
), anything that the other producer still needs to add will be rejected because the channel is already closed. This is a problem because I want the reader to read everything, not merely everything up until the point that any one producer has nothing more to produce.
How does one deal with this situation? Is there some built-in or otherwise "standard" way? For example, perhaps a "condenser" channel which exposes multiple Channel.Writer
objects (one for each "real" producer), and a single Channel.Reader
(for the single "real" consumer)?
Upvotes: 1
Views: 1112
Reputation: 1211
I wound up making this class, based on my "channel condenser" idea that I mentioned in my original question. It may or may not be horrible and/or bug-ridden, but at least so far it seems to do the job in a way that seems fairly natural and unobtrusive to me:
using Nito.AsyncEx;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
namespace Rwv37.System.Threading.Channels
{
public class ChannelCondenser<T>
{
private bool IsGoing { get; set; }
private AsyncLock IsGoingLock { get; init; }
private ConcurrentBag<Channel<T>> IncomingChannel { get; init; }
private Channel<T> OutgoingChannel { get; init; }
public ChannelCondenser()
{
this.IsGoingLock = new AsyncLock();
this.IncomingChannel = new();
this.OutgoingChannel = Channel.CreateUnbounded<T>();
}
public async Task GoAsync(CancellationToken cancellationToken = default)
{
using (await this.IsGoingLock.LockAsync(cancellationToken).ConfigureAwait(false))
{
if (this.IsGoing)
{
throw new System.InvalidOperationException("Cannot go - already going!");
}
this.IsGoing = true;
}
List<Task> tasks = new();
foreach (var incomingChannel in this.IncomingChannel)
{
tasks.Add(this.HandleIncomingChannelAsync(incomingChannel, cancellationToken));
}
await Task.WhenAll(tasks).ConfigureAwait(false);
this.OutgoingChannel.Writer.Complete();
}
public ChannelWriter<T> AddIncomingChannel()
{
using (this.IsGoingLock.Lock())
{
if (this.IsGoing)
{
throw new System.InvalidOperationException("New incoming channels cannot be added while going!");
}
}
Channel<T> incomingChannel = Channel.CreateUnbounded<T>();
this.IncomingChannel.Add(incomingChannel);
return incomingChannel.Writer;
}
public ChannelReader<T> GetOutgoingChannel()
{
return this.OutgoingChannel.Reader;
}
private async Task HandleIncomingChannelAsync(Channel<T> incomingChannel, CancellationToken cancellationToken)
{
await foreach (var item in incomingChannel.Reader.ReadAllAsync(cancellationToken).ConfigureAwait(false))
{
await this.OutgoingChannel.Writer.WriteAsync(item, cancellationToken).ConfigureAwait(false);
}
}
}
}
Usage within the consumer(s) and producer(s) is completely unchanged from that shown in my original question.
The only thing I had to change outside of them is how the classes using them are constructed. Consumer construction was changed from...
private Channel<Thing> WantedThingsChannel { get; init; }
(...)
this.WantedThingsChannel = Channel.CreateUnbounded<Thing>();
this.WantedThingsHandler = new(this.WantedThingsChannel.Reader);
... to...
private ChannelCondenser<Thing> WantedThingsCondenser { get; init; }
(...)
this.WantedThingsCondenser = new();
this.WantedThingsHandler = new(this.WantedThingsCondenser.GetOutgoingChannel());
Similarly, the producer's construction was changed from...
this.WantedThingsRetriever = new(this.WantedThingsChannel.Writer);
... to...
this.WantedThingsRetriever = new(this.WantedThingsCondenser.AddIncomingChannel());
Oh, no, wait, I lied. One other change outside of them: My program's main Task.WhenAll
was changed so that it additionally waits on the ChannelCondenser
. So, from...
List<Task> tasks = new()
{
this.WantedThingsHandler.GoAsync(cancellationToken),
this.WantedThingsRetriever.GoAsync(cancellationToken),
};
... to...
List<Task> tasks = new()
{
this.WantedThingsCondenser.GoAsync(cancellationToken),
this.WantedThingsHandler.GoAsync(cancellationToken),
this.WantedThingsRetriever.GoAsync(cancellationToken),
};
Upvotes: 0
Reputation: 43545
I don't think that there is a way that you could call "standard". A Channel<T>
is a tool that can be used in many different ways, much like a Task
or a SemaphoreSlim
. In your case, you could propagate the completion of all producers by using a counter like this:
int producersCount = X;
//...
await foreach (var thing in things)
await channel.Writer.WriteAsync(thing);
if (Interlocked.Decrement(ref producersCount) == 0) channel.Writer.Complete();
Alternatively, if each producer is a Task
, you could attach a continuation to all these tasks combined like this:
var producers = new List<Task>();
//...
_ = Task.WhenAll(producers).ContinueWith(_ => channel.Writer.Complete(),
default, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
The discard (_
) above has been used in order to communicate that the ContinueWith
continuation has been launched in a fire-and-forget fashion. If you dislike throwing unobserved tasks in the wind like me, you can handle the completion of the producers in an async void
method like this:
var producers = new List<Task>();
//...
HandleProducersCompletion();
//...
async void HandleProducersCompletion()
{
try { await Task.WhenAll(producers); }
finally { channel.Writer.Complete(); }
}
This way an exception thrown by the channel.Writer.Complete();
invocation will be unhandled and will crash the process. Which is arguably a good thing, considering the alternative which is a process that has been deadlocked for no apparent reason.
Upvotes: 3