Fire Lancer
Fire Lancer

Reputation: 30145

Get first IObservable event without blocking the thread/task that wants it

I am looking at using IObservable to get a response in a request-response environment within a c# async methods and replace some older callback based code, but I am finding that if a value is pushed (Subject.OnNext) to the observable but FirstAsync is not yet at the await, then the FirstAsync is never given that message.

Is there a straightforward way to make it work, without a 2nd task/thread plus synchronisation?

public async Task<ResponseMessage> Send(RequestMessage message)
{
    var id = Guid.NewGuid();
    var ret = Inbound.FirstAsync((x) => x.id == id).Timeout(timeout); // Never even gets invoked if response is too fast
    await DoSendMessage(id, message);
    return await ret; // Will sometimes miss the event/message
}

// somewhere else reading the socket in a loop
// may or may not be the thread calling Send
Inbound = subject.AsObservable();
while (cond)
{
    ...
    subject.OnNext(message);
}

I cant put the await for the FirstAsync simply before I send the request, as that would prevent the request being sent.

Upvotes: 1

Views: 389

Answers (2)

MistyK
MistyK

Reputation: 6222

I took a closer look and there is a very easy solution to your problem by just converting hot into cold observable. Replace Subject with ReplaySubject. Here is the article: http://www.introtorx.com/content/v1.0.10621.0/14_HotAndColdObservables.html.

Here is the explanation:

The Replay extension method allows you take an existing observable sequence and give it 'replay' semantics as per ReplaySubject. As a reminder, the ReplaySubject will cache all values so that any late subscribers will also get all of the values.

Upvotes: 1

Stephen Cleary
Stephen Cleary

Reputation: 456507

The await will subscribe to the observable. You can separate the subscription from the await by calling ToTask:

public async Task<ResponseMessage> Send(RequestMessage message)
{
  var id = Guid.NewGuid();
  var ret = Inbound.FirstAsync((x) => x.id == id).Timeout(timeout).ToTask();
  await DoSendMessage(id, message);
  return await ret;
}

Upvotes: 1

Related Questions