GigAHerZ
GigAHerZ

Reputation: 53

Blazor WebAssembly SignalR Streams - can't have multiple components

I'm learning to work with Blazor WebAssembly and SignalR. I want to have single WebSocket connection (therefore a single HubConnection instance) and share that between all the Blazor components that need to do something over SignalR WebSockets.

I want to call HubConnection.StreamAsync() and then loop over the generated stream as the data comes in (it's an endless stream). I also want it to cancel the stream when component is not shown anymore.

I have 2 problems with this idea:

  1. Only one component is capable of streaming values. Other components get stuck on "await foreach" and never receive any items.
  2. During component's dispose, i trigger the cancellation token, but the server side doesn't receive a cancellation. Only when i refresh the whole browser, the cancellation is triggered.
    • When i traverse to any other Blazor page and then return to the page where my streaming components are, now no component ever receives items.

I have a vague hunch that those 2 things are somehow connected - somehow on server side, a single hub can only really respond to single StreamAsync() call in single connection?

In the end, i've been unable to find a solution for this. What am i doing wrong? Maybe any of you could help me out on this?

Code

To reproduce, you may follow the instructions:

Start with Blazor WebAssembly with ASP.NET MVC backend project. I use .net core 6, most nugets are with version 6.0.7. I have also installed nuget Microsoft.AspNetCore.SignalR.Client on Blazor Client project.

Do the following changes/updates:

Client - App.razor Add this code

@using Microsoft.AspNetCore.SignalR.Client
@implements IAsyncDisposable
@inject HubConnection HubConnection

...

@code {
    private CancellationTokenSource cts = new();

    protected override void OnInitialized()
    {
        base.OnInitialized();

        HubConnection.Closed += error =>
        {
            return ConnectWithRetryAsync(cts.Token);
        };

        _ = ConnectWithRetryAsync(cts.Token);
    }

    private async Task<bool> ConnectWithRetryAsync(CancellationToken token)
    {
        // Keep trying to until we can start or the token is canceled.
        while (true)
        {
            try
            {
                await HubConnection.StartAsync(token);
                return true;
            }
            catch when (token.IsCancellationRequested)
            {
                return false;
            }
            catch
            {
                // Try again in a few seconds. This could be an incremental interval           
                await Task.Delay(5000);
            }
        }
    }

    public async ValueTask DisposeAsync()
    {
        cts.Cancel();
        cts.Dispose();
        await HubConnection.DisposeAsync();
    }
}

Client - Program.cs Add the following singleton to DI

builder.Services.AddSingleton(sp =>
{
    var navMan = sp.GetRequiredService<NavigationManager>();
    return new HubConnectionBuilder()
        .WithUrl(navMan.ToAbsoluteUri("/string"))
        .WithAutomaticReconnect()
        .Build();
});

Client - Create a component called "StringDisplay"

@using Microsoft.AspNetCore.SignalR.Client
@inject HubConnection HubConnection
@implements IDisposable

@if(currentString == string.Empty)
{
    <i>Loading...</i>
}
else
{
    @currentString
}

@code {
    private string currentString = string.Empty;
    private CancellationTokenSource cts = new();

    protected override void OnInitialized()
    {
        base.OnInitialized();

        _ = Consumer();
    }

    protected override void OnParametersSet()
    {
        base.OnParametersSet();

        _ = Consumer();
    }

    private async Task Consumer()
    {
        try
        {
            cts.Cancel();
            cts.Dispose();
            cts = new();

            var stream = HubConnection.StreamAsync<string>("GetStrings", cts.Token);

            await foreach(var str in stream)
            {
                if(cts.IsCancellationRequested)
                    break;

                currentString = str;
                StateHasChanged();
            }
        }
        catch(Exception e)
        {
            Console.WriteLine(e);
        }
    }

    public void Dispose()
    {
        cts.Cancel();
        cts.Dispose();
    }
}

Client - Index.razor Add the StringDisplay component 3 times onto the page:

<hr />

<StringDisplay /><hr />
<StringDisplay /><hr />
<StringDisplay /><hr />

Server - Create StringGeneratorService.cs

namespace BlazorWebAssembly.Server.Services;

public class StringGeneratorService
{
    private readonly PeriodicTimer _timer;

    public event Action<string>? OnGenerated;

    public StringGeneratorService()
    {
        _timer = new PeriodicTimer(TimeSpan.FromMilliseconds(200));

        Task.Run(TimerRunnerAsync);
    }

    private async Task TimerRunnerAsync()
    {
        while (true)
        {
            await _timer.WaitForNextTickAsync();

            var str = Guid.NewGuid().ToString();

            OnGenerated?.Invoke(str);
        }
    }
}

Server - Create StringHub.cs

using BlazorWebAssembly.Server.Services;
using Microsoft.AspNetCore.SignalR;
using System.Runtime.CompilerServices;

namespace BlazorWebAssembly.Server.Hubs
{
    public class StringHub : Hub
    {
        private readonly StringGeneratorService _generatorService;

        public StringHub(StringGeneratorService generatorService)
        {
            _generatorService = generatorService;
        }

        public async IAsyncEnumerable<string> GetStrings([EnumeratorCancellation] CancellationToken cancellationToken)
        {
            using var flag = new AutoResetEvent(false);

            string currentString = string.Empty;

            var listener = (string str) => { currentString = str; flag.Set(); };

            _generatorService.OnGenerated += listener;

            cancellationToken.Register(() =>
            {
                _generatorService.OnGenerated -= listener;
            });

            while (!cancellationToken.IsCancellationRequested)
            {
                flag.WaitOne();

                yield return currentString;
            }

            yield break;
        }

    }
}

Server - Program.cs Register necessary parts

builder.Services.AddSingleton<StringGeneratorService>();
...
app.MapHub<StringHub>("/string");

Upvotes: 1

Views: 930

Answers (1)

GigAHerZ
GigAHerZ

Reputation: 53

I've found the issue myself.

The core issue is - SignalR Hubs are singletons with single thread.

If you look at StringHub.cs, there's a line in while loop: flag.WaitOne();. This blocks the whole thread for that Hub implementation.

I've made myself an extension to AutoResetEvent that allows me to wait for a signal asynchronously. The extension would be like this:

public static async Task<bool> WaitOneAsync(this WaitHandle waitHandle, CancellationToken cancellationToken)
{
    try
    {
        return await Task.Run(waitHandle.WaitOne, cancellationToken);
    }
    catch (TaskCanceledException)
    {
        return false;
    }
}

With that extension, I can wait for a signal with following line: await flag.WaitOneAsync(cancellationToken);

So just remember - Hubs are single-threaded on their own! Don't block their thread!

Upvotes: 0

Related Questions