Reputation: 504
I am struggling with the observer pattern with NuGet package websocket-client (https://github.com/Marfusios/websocket-client)
The connection to the WebSocket server is stable and running.
Every request has a request ID inside the payload. The client sends it to the server and the server responds with the ID and the real data.
On the client side I need to assign every response to the corresponding request.
I thought I can do it like so:
public Task<Data> GetDataAsync()
{
var webSocket = new WebsocketClient(Uri);
await webSocket.Start();
var requestId = Guid.NewGuid();
var tcs = new TaskCompletionSource<Data>();
var disposable = webSocket
.MessageReceived
.Where(message => message.Text.Contains(requestId))
.Subscribe(message=>
{
var data = ParseData(message.Text);
tcs.SetResult(data);
});
return tcs.Task;
}
But it actually never jumps into the subscribe method. Am I using it wrong?
Upvotes: 0
Views: 3444
Reputation: 504
I think
public Task<Data> GetDataAsync(string request)
{
var requestId = Guid.NewGuid().ToString();
var responseTask = WebSocket
.MessageReceived
.Timeout(TimeSpan.FromSeconds(5))
.FirstOrDefaultAsync(message => message.Text.Contains(requestId));
WebSocket.Send(request);
var responseMessage = await responseTask;
return ParseMessage(responseMessage);
}
is the way to go. I would even prefer SingleOrDefaultAsync
instead of FirstOrDefaultAsync
because there will be only one message with that request id. But that doesn't work. It always runs in the timeout.
Upvotes: 0
Reputation: 117055
You've made your code far more complicated than it needs to me. Rx let's await an observable to get the last value produced. You can write your code like this:
public async Task<Data> GetDataAsync() =>
await
Observable
.Using(
() => new WebsocketClient(Uri),
ws =>
from x in Observable.FromAsync(() => ws.Start())
let requestId = Guid.NewGuid()
from m in ws.MessageReceived
where m.Text.Contains(requestId)
select ParseData(m.Text))
.Take(1)
.Timeout(TimeSpan.FromSeconds(5.0));
Upvotes: -1