Reputation: 327
I'm new to Rx in .net but have started using it with some success wrapping up some network communication
Very simplified example:
IObservable<Result> SendRequest(Request request)
{
return Observable.Create<Result>(observer =>
{
NetworkComms.SendReqeust(request,
result =>
{
observer.OnNext(result);
observer.OnCompleted();
});
return Disposable.Empty;
}).SubscribeOn(_commsScheduler);
}
The issue I have is that the command (NetworkComms.SendRequest) is not actually sent until the caller subscribes to the returned IObservable. In some cases the request is fire-and-forget command and the result is pretty meaningless, so it makes little sense for the caller to actually subscribe to the returned IObservable.
The functionality I need is:
I tried doing this using .Replay().RefCount() and doing a Subscribe() internally before returning the IObservable. This almost works, but in the case where the client subscribes after the result received (and thus after the auto-disposing of the sequence on completion) it causes the subscription code to be called again, sending the command for a second time.
Is there a simple Rx extension that can handle this scenario for me, or do I need to roll my own?
Upvotes: 1
Views: 82
Reputation: 79441
It looks like you want to use a AsyncSubject<T>
:
IObservable<Result> SendRequest(Request request)
{
var subject = new AsyncSubject<Result>();
NetworkComms.SendReqeust(request, result =>
{
subject.OnNext(result);
subject.OnCompleted();
});
return subject.AsObservable();
}
I should add that the interface you desire is a little odd. If the semantics are "The moment I call this method, a request is made, and anybody who wants to can get the response at a later time," then consider just using Task<Result>
instead of IObservable<Result>
.
Task<Result> SendRequestAsync(Request request)
{
var tcs = new TaskCompletionSource<Result>();
NetworkComms.SendReqeust(request, result => tcs.SetResult(result));
return tcs.Task;
}
Furthermore, consider putting this Task<Result> SendRequestAsync(Request request)
method directly on your NetworkComms
class.
Upvotes: 4