Reputation: 30145
I am looking at using IObservable
to get a response in a request-response environment within a c# async
methods and replace some older callback based code, but I am finding that if a value is pushed (Subject.OnNext
) to the observable but FirstAsync
is not yet at the await, then the FirstAsync
is never given that message.
Is there a straightforward way to make it work, without a 2nd task/thread plus synchronisation?
public async Task<ResponseMessage> Send(RequestMessage message)
{
var id = Guid.NewGuid();
var ret = Inbound.FirstAsync((x) => x.id == id).Timeout(timeout); // Never even gets invoked if response is too fast
await DoSendMessage(id, message);
return await ret; // Will sometimes miss the event/message
}
// somewhere else reading the socket in a loop
// may or may not be the thread calling Send
Inbound = subject.AsObservable();
while (cond)
{
...
subject.OnNext(message);
}
I cant put the await
for the FirstAsync
simply before I send the request, as that would prevent the request being sent.
Upvotes: 1
Views: 389
Reputation: 6222
I took a closer look and there is a very easy solution to your problem by just converting hot into cold observable. Replace Subject
with ReplaySubject
. Here is the article: http://www.introtorx.com/content/v1.0.10621.0/14_HotAndColdObservables.html.
Here is the explanation:
The Replay extension method allows you take an existing observable sequence and give it 'replay' semantics as per ReplaySubject. As a reminder, the ReplaySubject will cache all values so that any late subscribers will also get all of the values.
Upvotes: 1
Reputation: 456507
The await
will subscribe to the observable. You can separate the subscription from the await
by calling ToTask
:
public async Task<ResponseMessage> Send(RequestMessage message)
{
var id = Guid.NewGuid();
var ret = Inbound.FirstAsync((x) => x.id == id).Timeout(timeout).ToTask();
await DoSendMessage(id, message);
return await ret;
}
Upvotes: 1