BlackMatrix
BlackMatrix

Reputation: 504

WebSocket send and receive data asynchronously (using NuGet websocket-client)

I am struggling with the observer pattern with NuGet package websocket-client (https://github.com/Marfusios/websocket-client)

The connection to the WebSocket server is stable and running.

Every request has a request ID inside the payload. The client sends it to the server and the server responds with the ID and the real data.

On the client side I need to assign every response to the corresponding request.

I thought I can do it like so:

public Task<Data> GetDataAsync()
{
    var webSocket = new WebsocketClient(Uri);
    await webSocket.Start();

    var requestId = Guid.NewGuid();

    var tcs = new TaskCompletionSource<Data>();

    var disposable = webSocket
    .MessageReceived
    .Where(message => message.Text.Contains(requestId))
    .Subscribe(message=>
    {
        var data = ParseData(message.Text);
        tcs.SetResult(data);
    });

    return tcs.Task;
}

But it actually never jumps into the subscribe method. Am I using it wrong?

Upvotes: 0

Views: 3444

Answers (2)

BlackMatrix
BlackMatrix

Reputation: 504

I think

public Task<Data> GetDataAsync(string request)
{
    var requestId = Guid.NewGuid().ToString();

    var responseTask = WebSocket
    .MessageReceived
    .Timeout(TimeSpan.FromSeconds(5))
    .FirstOrDefaultAsync(message => message.Text.Contains(requestId));

    WebSocket.Send(request);

    var responseMessage = await responseTask;

    return ParseMessage(responseMessage);
}

is the way to go. I would even prefer SingleOrDefaultAsync instead of FirstOrDefaultAsync because there will be only one message with that request id. But that doesn't work. It always runs in the timeout.

Upvotes: 0

Enigmativity
Enigmativity

Reputation: 117055

You've made your code far more complicated than it needs to me. Rx let's await an observable to get the last value produced. You can write your code like this:

public async Task<Data> GetDataAsync() =>
    await
        Observable
            .Using(
                () => new WebsocketClient(Uri),
                ws =>
                    from x in Observable.FromAsync(() => ws.Start())
                    let requestId = Guid.NewGuid()
                    from m in ws.MessageReceived
                    where m.Text.Contains(requestId)
                    select ParseData(m.Text))
            .Take(1)
            .Timeout(TimeSpan.FromSeconds(5.0));

Upvotes: -1

Related Questions