Reputation: 129
Currently I'am build a webserver that needs to wait from incoming messages, but i need to wait asynchronisally for these messages. These messages are received in an other Task that keeps running. The endless task pushes Events and the other Task needs to wait for one of these events too be received.
If will make a sketch if needed to visualize the problem better
I have seen the TaskCompletionSource class but the task will not complete except when the client disconnects so that's not going to work. Await for the task is exactly the same.
Is there a Library or build in solution for C#/.net core?
Thank you :)
Upvotes: 2
Views: 2656
Reputation: 131676
Channels
ASP.NET Core SignalR uses Channels to implement event streaming - a publisher method produces events asynchronously that are processed by another method. In SignalR's case it's a consumer that pushes each new event to the clients.
Doing the same in your code is easy, but make sure to follow the pattern shown in the SignalR docs - the channel is created by the worker itself and never exposed to the callers. This means there's no ambiguity about the state of the channels as only the worker can close it.
Another important point is that you have to close the channel when finished, even if there's an exception. Otherwise, the consumers will block indefinitely.
You'll also need to pass a CancellationToken to the producer so you can terminate it at some point. Even if you don't intend to explicitly cancel the producer, you need a way to tell it to stop when the application terminates.
The following method creates the channel and ensures it closes even when an exception occurs.
ChannelReader<int> MyProducer(someparameters,CancellationToken token)
{
var channel=Channel.CreateUnbounded<int>();
var writer=channel.Writer;
_ = Task.Run(async()=>{
while(!token.IsCancellationRequested)
{
var i= ... //do something to produce a value
await writer.WriteAsync(i,token);
}
},token)
//IMPORTANT: Close the channel no matter what.
.ContinueWith(t=>writer.Complete(t.Exception));
return channel.Reader;
}
t.Exception
will be null if the producer finishes gracefully or if the worker task is cancelled through the token. There's no reason to use ChannelReader.TryComplete
as only one task is writing to the writer at a time.
A consumer only needs that reader to consume the events:
async Task MyConsumer(ChannerReader<int> reader,CancellationToken token)
{
while (await reader.WaitToReadAsync(cts.Token).ConfigureAwait(false))
{
while (reader.TryRead(out int item))
{
//Use that integer
}
}
}
Which can be used with :
var reader=MyProducer(...,cts.Token);
await MyConsumer(reader,cts.Token);
IAsyncEnumerable
I cheated a bit here because I copied the loop code from the ChannelReader.ReadAllAsync method which returns an IAsyncEnumerable which allows looping asynchronously. In .NET Core 3.0 and .NET Standard 2.1 (but not only that) one could replace the consumer with just :
await foreach (var i from reader.ReadAllAsync(token))
{
//Use that integer
}
The Microsoft.Bcl.AsyncInterfaces package adds the same interfaces to previous .NET versions
Upvotes: 3
Reputation: 169360
There is the new Channel<T>
API in .NET Core 2.1 and System.Threading.Channels that lets you consume a queue asynchronously:
Channel<int> channel = Channel.CreateUnbounded<int>();
...
ChannelReader<int> c = channel.Reader;
while (await c.WaitToReadAsync())
{
if (await c.ReadAsync(out int item))
{
// process item...
}
}
Please refer to this blog post for an introduction and more examples of how to use it.
Upvotes: 5