morleyc
morleyc

Reputation: 2451

State machine and network socket - how to handle race conditions

I am using Stateless in my C# network project, primarily as it is a nice way to add features such as wire level authorization after socket connection, delays for reconnection etc.

With that said, I am running into some race conditions and deadlocks of my own doing - reaching out for advise on best ways of dealing with this for the following states:

enum State { Stopped, Disconnected, Connecting, Connected, Resetting }
enum Trigger { Start, Stop, Connect, SetConnectComplete, Reset, SetResetComplete }

class StateMachine : StateMachine<State, Trigger>
{
    public StateMachine(Action OnDisconnected, Action OnConnecting, Action OnConnected, Action OnResetting) : base(State.Stopped)
    {
        this.Configure(State.Stopped)
            .Permit(Trigger.Start, State.Disconnected);

        this.Configure(State.Disconnected)
            .OnEntry(OnDisconnected)
            .Permit(Trigger.Connect, State.Connecting);

        this.Configure(State.Connecting)
            .OnEntry(OnConnecting)
            .Permit(Trigger.SetConnectComplete, State.Connected)
            .Permit(Trigger.Reset, State.Resetting);

        this.Configure(State.Connected)
            .OnEntry(OnConnected)
            .Permit(Trigger.Reset, State.Resetting);

        this.Configure(State.Resetting)
            .OnEntry(OnResetting)
            .Permit(Trigger.SetResetComplete, State.Disconnected);
    }
}

The functionality of this is where socket will automatically reconnect, and on connection start the receive loop. In the event of a socket error, it should go back to free up resources and then loop back around to start up again.

However when I dispose the object, the connected socket aborts, which also frees resources plus it tries waiting on itself.

I believe this is to do with the thread waiting on itself, so my design/state structure is definitely fundamentally off and appreciate pointers on better structure that would avoid deadlock entirely.

public class ManagedWebSocket : IDisposable
{
    readonly StateMachine stateMachine;
    Task backgroundReaderTask;

    private ClientWebSocket webSocket;
    private readonly ITargetBlock<byte[]> target;
    private readonly ILogger<ManagedWebSocket> logger;
    private CancellationTokenSource cancellationTokenSource;
    bool isDisposing;

    public ManagedWebSocket(string uri, ITargetBlock<byte[]> target, ILogger<ManagedWebSocket> logger)
    {
        this.stateMachine = new StateMachine(OnDisconnected, OnConnecting, OnConnected, OnResetting);
        this.target = target;
        this.logger = logger;
    }

    private void OnConnecting()
    {
        this.backgroundReaderTask = Task.Run(async () =>
        {
            this.cancellationTokenSource = new CancellationTokenSource();
            this.webSocket = new ClientWebSocket();
            webSocket.Options.KeepAliveInterval = KeepAliveInterval;

            try
            {
                await this.webSocket.ConnectAsync(this.uri, cancellationTokenSource.Token);
            }
            catch(WebSocketException ex)
            {
                this.logger.LogError(ex.Message, ex);
                await this.stateMachine.FireAsync(Trigger.Reset);
            }

            this.stateMachine.Fire(Trigger.SetConnectComplete);
        });
    }
    
    private void OnDisconnected()
    {
        if (isDisposing == false)
            this.stateMachine.Fire(Trigger.Connect);
    }

    private void OnResetting()
    {
        FreeResources();
        this.stateMachine.Fire(Trigger.SetResetComplete);
    }

    private void OnConnected()
    {
        this.backgroundReaderTask = Task.Run( async () => {
            try
            {
                // returns when the internal frame loop completes with websocket close, or by throwing an exception
                await this.webSocket.ReceiveFramesLoopAsync(target.SendAsync, 2048, this.cancellationTokenSource.Token);
            }
            catch (Exception ex)
            {
                this.logger.LogError(ex.Message, ex);
            }

            await this.stateMachine.FireAsync(Trigger.Reset);
        });
    }

    public async Task SendAsync(byte[] data, WebSocketMessageType webSocketMessageType)
    {
        if (this.stateMachine.State != State.Connected)
            throw new Exception($"{nameof(ManagedWebSocket)} is not yet connected.");

        try
        {
            await webSocket
                    .SendAsChunksAsync(data, webSocketMessageType, 2048, this.cancellationTokenSource.Token)
                    .ConfigureAwait(false);
        }
        catch (Exception ex)
        {
            this.logger.LogError(ex, ex.Message);
            await this.stateMachine.FireAsync(Trigger.Reset);
        }
    }

    public void Start()
    {
        this.stateMachine.Fire(Trigger.Start);
    }    

    public void FreeResources()
    {
        this.logger.LogDebug($"{nameof(ManagedWebSocket.FreeResources)}");
        this.cancellationTokenSource?.Cancel();
        this.backgroundReaderTask?.Wait();
        this.cancellationTokenSource?.Dispose();
        this.backgroundReaderTask?.Dispose();
    }

    public void Dispose()
    {
        if (isDisposing)
            return;

        isDisposing = true;
        FreeResources();
    }
}

Upvotes: 0

Views: 779

Answers (1)

Chilippso
Chilippso

Reputation: 491

I guess the deadlock is caused by calling FreeResources(); in OnResetting(), since FreeResources(); is waiting for backgroundReaderTask but within backgroundReaderTask you are awaiting OnResetting() via await this.stateMachine.FireAsync(Trigger.Reset);.

As some sort of workaround you could just omit the "await" keyword for triggering the reset, since it will dispose the entire object anyway.

Also note that there seems no reason for invoking this.stateMachine.Fire(Trigger.SetConnectComplete); if an exception was thrown previously in OnConnecting() - just move it into the try-block.

Additionally, as some sort of best-practice and side note, try following the recommended dispose pattern

Upvotes: 1

Related Questions