Reputation: 53
I'm learning to work with Blazor WebAssembly and SignalR. I want to have single WebSocket connection (therefore a single HubConnection instance) and share that between all the Blazor components that need to do something over SignalR WebSockets.
I want to call HubConnection.StreamAsync() and then loop over the generated stream as the data comes in (it's an endless stream). I also want it to cancel the stream when component is not shown anymore.
I have 2 problems with this idea:
I have a vague hunch that those 2 things are somehow connected - somehow on server side, a single hub can only really respond to single StreamAsync() call in single connection?
In the end, i've been unable to find a solution for this. What am i doing wrong? Maybe any of you could help me out on this?
Code
To reproduce, you may follow the instructions:
Start with Blazor WebAssembly with ASP.NET MVC backend project. I use .net core 6, most nugets are with version 6.0.7. I have also installed nuget Microsoft.AspNetCore.SignalR.Client on Blazor Client project.
Do the following changes/updates:
Client - App.razor Add this code
@using Microsoft.AspNetCore.SignalR.Client
@implements IAsyncDisposable
@inject HubConnection HubConnection
...
@code {
private CancellationTokenSource cts = new();
protected override void OnInitialized()
{
base.OnInitialized();
HubConnection.Closed += error =>
{
return ConnectWithRetryAsync(cts.Token);
};
_ = ConnectWithRetryAsync(cts.Token);
}
private async Task<bool> ConnectWithRetryAsync(CancellationToken token)
{
// Keep trying to until we can start or the token is canceled.
while (true)
{
try
{
await HubConnection.StartAsync(token);
return true;
}
catch when (token.IsCancellationRequested)
{
return false;
}
catch
{
// Try again in a few seconds. This could be an incremental interval
await Task.Delay(5000);
}
}
}
public async ValueTask DisposeAsync()
{
cts.Cancel();
cts.Dispose();
await HubConnection.DisposeAsync();
}
}
Client - Program.cs Add the following singleton to DI
builder.Services.AddSingleton(sp =>
{
var navMan = sp.GetRequiredService<NavigationManager>();
return new HubConnectionBuilder()
.WithUrl(navMan.ToAbsoluteUri("/string"))
.WithAutomaticReconnect()
.Build();
});
Client - Create a component called "StringDisplay"
@using Microsoft.AspNetCore.SignalR.Client
@inject HubConnection HubConnection
@implements IDisposable
@if(currentString == string.Empty)
{
<i>Loading...</i>
}
else
{
@currentString
}
@code {
private string currentString = string.Empty;
private CancellationTokenSource cts = new();
protected override void OnInitialized()
{
base.OnInitialized();
_ = Consumer();
}
protected override void OnParametersSet()
{
base.OnParametersSet();
_ = Consumer();
}
private async Task Consumer()
{
try
{
cts.Cancel();
cts.Dispose();
cts = new();
var stream = HubConnection.StreamAsync<string>("GetStrings", cts.Token);
await foreach(var str in stream)
{
if(cts.IsCancellationRequested)
break;
currentString = str;
StateHasChanged();
}
}
catch(Exception e)
{
Console.WriteLine(e);
}
}
public void Dispose()
{
cts.Cancel();
cts.Dispose();
}
}
Client - Index.razor Add the StringDisplay component 3 times onto the page:
<hr />
<StringDisplay /><hr />
<StringDisplay /><hr />
<StringDisplay /><hr />
Server - Create StringGeneratorService.cs
namespace BlazorWebAssembly.Server.Services;
public class StringGeneratorService
{
private readonly PeriodicTimer _timer;
public event Action<string>? OnGenerated;
public StringGeneratorService()
{
_timer = new PeriodicTimer(TimeSpan.FromMilliseconds(200));
Task.Run(TimerRunnerAsync);
}
private async Task TimerRunnerAsync()
{
while (true)
{
await _timer.WaitForNextTickAsync();
var str = Guid.NewGuid().ToString();
OnGenerated?.Invoke(str);
}
}
}
Server - Create StringHub.cs
using BlazorWebAssembly.Server.Services;
using Microsoft.AspNetCore.SignalR;
using System.Runtime.CompilerServices;
namespace BlazorWebAssembly.Server.Hubs
{
public class StringHub : Hub
{
private readonly StringGeneratorService _generatorService;
public StringHub(StringGeneratorService generatorService)
{
_generatorService = generatorService;
}
public async IAsyncEnumerable<string> GetStrings([EnumeratorCancellation] CancellationToken cancellationToken)
{
using var flag = new AutoResetEvent(false);
string currentString = string.Empty;
var listener = (string str) => { currentString = str; flag.Set(); };
_generatorService.OnGenerated += listener;
cancellationToken.Register(() =>
{
_generatorService.OnGenerated -= listener;
});
while (!cancellationToken.IsCancellationRequested)
{
flag.WaitOne();
yield return currentString;
}
yield break;
}
}
}
Server - Program.cs Register necessary parts
builder.Services.AddSingleton<StringGeneratorService>();
...
app.MapHub<StringHub>("/string");
Upvotes: 1
Views: 930
Reputation: 53
I've found the issue myself.
The core issue is - SignalR Hubs are singletons with single thread.
If you look at StringHub.cs
, there's a line in while
loop: flag.WaitOne();
. This blocks the whole thread for that Hub implementation.
I've made myself an extension to AutoResetEvent
that allows me to wait for a signal asynchronously. The extension would be like this:
public static async Task<bool> WaitOneAsync(this WaitHandle waitHandle, CancellationToken cancellationToken)
{
try
{
return await Task.Run(waitHandle.WaitOne, cancellationToken);
}
catch (TaskCanceledException)
{
return false;
}
}
With that extension, I can wait for a signal with following line:
await flag.WaitOneAsync(cancellationToken);
So just remember - Hubs are single-threaded on their own! Don't block their thread!
Upvotes: 0