Reputation: 41
I am working on a .net core 3.0 web application and have decided to use System.Threading.Channels in a singleton service. The top level of my scoped request services injects this singleton to access its channels.
I have decided to use this pattern to decouple the requests (which produce live updates for other connected clients) from the execution of those updates.
The implementation of ONE channel within an object has many examples out there.
Can anyone tell me if its possible/advisable to use multiple channels within my singleton?
I'm not yet running into any problems creating multiple channels and "starting" them when the singleton is created. I just haven't got to a point where I can test with multiple clients requests hitting different channels on the singleton to see if it works well. (Or at all? ... )
My main motivation for using multiple channels is I want the singleton to do different things based on the type of the item in the channel.
public class MyChannelSingleton
{
public Channel<MyType> TypeOneChannel = Channel.CreateUnbounded<MyType>();
public Channel<MyOtherType> TypeTwoChannel = Channel.CreateUnbounded<MyOtherType>();
public MyChannelSingleton()
{
StartChannels();
}
private void StartChannels()
{
// discarded async tasks due to calling in ctor
_ = StartTypeOneChannel();
_ = StartTypeTwoChannel();
}
private async Task StartTypeOneChannel()
{
var reader = TypeOneChannel.Reader;
while (await reader.WaitToReadAsync())
{
if (reader.TryRead(out MyType item))
{
// item is sucessfully read from channel
}
}
}
private async Task StartTypeTwoChannel()
{
var reader = TypeTwoChannel.Reader;
while (await reader.WaitToReadAsync())
{
if (reader.TryRead(out MyOtherType item))
{
// item is sucessfully read from channel
}
}
}
}
I also expect to never "Complete" the channels and have them available for the lifetime of the application.
Upvotes: 3
Views: 10944
Reputation: 131492
You can use as many as you want, provided you use them correctly. In fact, using a background service (essentially a singleton) that exposes a processing pipeline is a very common way to use them in .NET Core.
Channels are not just async queues. They are similar to DataFlow blocks - they can be used to create processing pipelines, with each block/worker processing the data from an input buffer/ChannelReader and forwarding the results to an output buffer/ChannelWriter. DataFlow blocks handle asynchronous processing through tasks themselves. With channels, we need to handle the worker tasks ourselves.
A very important concept we need to keep in mind is that channels aren't accessed directly. In fact, in almost all cases they shouldn't even be exposed as fields or properties. In most cases, only a ChannelReader is needed. In some cases, eg at the head of a pipeline, a ChannelWriter may be exposed. Or not.
Individual workers/steps
A typical worker step would look like this
private ChannelReader<MyType2> Step1(ChannelReader<MyType> reader,CancellationToken token=default)
{
var channel=Channel.CreateUnbounded<MyOtherType>();
var writer=channel.Writer;
_ = Task.Run(async ()=>{
await foreach(var item from reader.ReadAllAsync(token))
{
MyType2 result=........;
await writer.WriteAsync(result);
}
},token).ContinueWith(t=>channel.TryComplete(t));
return channel.Reader;
}
Some things to note :
Task.WhenAll
to await for all workers to complete before closing the channel.Combining steps
Multiple steps can be combined by passing one's output reader to another's input reader, eg :
var cts=new CancelaltionTokenSource();
var step1=Step1(headReader,cts.Token);
var step2=Step2(step1,cts.Token);
var step3=Step3(step2,cts.Token);
...
await stepN.Completion;
The CancellationTokenSource can be used to end the pipeline prematurely or set a timeout as a safeguard against hanged pipelines.
The pipeline head
The "head" reader could come from an "adapter" method like :
private ChannelReader<T> ToChannel(IEnumerable<T> input,CancellationToken token)
{
var channel=Channel.CreateUnbounded<T>();
var writer=channel.Writer;
foreach(var item from input)
{
if (token.IsCancellationRequested)
{
break;
}
writer.TryWrite(result);
}
//No-one else is going to complete this channel
channel.Complete();
return channel.Reader;
}
In the case of a background service, we could use a service method to "post" input to a head channel, eg :
class MyService
{
Channel<MyType0> _headChannel;
public MyService()
{
_headChannel=Channel.CreateBounded<MyType0>(5);
}
public async Task ExecuteAsync(CancellationToken token)
{
var step1=Step1(_headChannel.Reader,token);
var step2=Step2(step1,token);
await step2.Completion;
}
public Task PostAsync(MyType0 input)
{
return _headChannel.Writer.WriteAsync(input);
}
public Stop()
{
_headChannel.Writer.TryComplete();
}
...
}
I'm using method names that look like the BackgroundService method names on purpose. StartAsync or ExecuteAsync can be used to set up the pipeline. StopAsync can be used to signal its completion, eg when the end user hits Ctrl+C.
Another useful technique shown in the queued BackgroundService example is registering an interface that clients can use to post messages instead of accessing the service class directly, eg :
interface IQueuedService<T>
{
Task PostAsync(T input);
}
Combined with System.Linq.Async
The ReadAllAsync()
method returns an IAsyncEnumerable<T>
which means we can use operators in System.Linq.Async like Where or Take to filter, batch or transform messages eg :
private ChannelReader<MyType> ActiveOnly(ChannelReader<MyType> reader,CancellationToken token=default)
{
var channel=Channel.CreateUnbounded<MyType>();
var writer=channel.Writer;
_ = Task.Run(async ()=>{
var inpStream=reader.ReadAllAsync(token)
.Where(it=>it.IsActive);
await foreach(var item from inpStream)
{
await writer.WriteAsync(item);
}
},token).ContinueWith(t=>channel.TryComplete(t));
return channel.Reader;
}
Upvotes: 10
Reputation: 43553
A Channel<T>
is just a thread-safe async queue. It doesn't do any processing by itself, it is just a passive in-memory FIFO storage. You can have as many of them as you want.
You could take advantage of the fact that a Channel
exposes separately a Reader
and a Writer
, to limit the access of the clients of your class to the minimum functionality they need. In other words instead of exposing properties of type Channel<T>
, you could consider exposing properties of type ChannelWriter<T>
or ChannelReader<T>
.
Also creating unbounded channels should be done with caution. A single misused channel could make your application a victim of OutOfMemoryException
quite easily.
An alternative of exposing properties of type ChannelReader<T>
could be exposing IAsyncEnumerable<T>
s.
Upvotes: 1
Reputation: 9804
Unfortunately I can not find the sourcecode. And calling the Documentation sparse would be a understatement. So I can at best tell you "if it was my class, how I would do it".
The big issue with having multiple channels in memory - particular unbounded - would be memory fragmentation causing a early OOM. Indeed with even one unbounded, a big issue would be having to grow the collection. List<T>
is little more then a wrapper around a T[]
with some automatic growth support. Another issue with unbounded lists, is that sooner or later you run out of indexes.
How would I solve this? A Linked List. In about 90% of all cases, a Linked List would be the last collection I would even consider. The remaining 10% are Queues and Queue like constructs. And channels look very much like a Queue. Of those 10% cases, in 9% I would just use whatever the Queue implementation does. This is the remaining 1%.
For random access the Linked List is the worst possible collection. For queues it is doable. But at avoiding Fragmentation related OOMs in .NET? For minimising the cost of growth? For getting around the the hard array limit? There the Linked List is absolutely unbeatable.
And if it does not do that? It should be doable to make your own Version of channel that does do that and just replace it.
Upvotes: -3