Andy
Andy

Reputation: 327

RX IObserver subscription timing

I'm new to Rx in .net but have started using it with some success wrapping up some network communication

Very simplified example:

    IObservable<Result> SendRequest(Request request)
    {
        return Observable.Create<Result>(observer =>
        {
            NetworkComms.SendReqeust(request,
                result =>
                {
                    observer.OnNext(result);
                    observer.OnCompleted();
                });
            return Disposable.Empty;
        }).SubscribeOn(_commsScheduler);
    }

The issue I have is that the command (NetworkComms.SendRequest) is not actually sent until the caller subscribes to the returned IObservable. In some cases the request is fire-and-forget command and the result is pretty meaningless, so it makes little sense for the caller to actually subscribe to the returned IObservable.

The functionality I need is:

  1. The command is sent immediately, even if the caller never subscribes to the IObservable
  2. If and when the client subscribes, they will get the result even if they are late subscribing
  3. The command is only ever sent once, but all subscriptions should get the same results.

I tried doing this using .Replay().RefCount() and doing a Subscribe() internally before returning the IObservable. This almost works, but in the case where the client subscribes after the result received (and thus after the auto-disposing of the sequence on completion) it causes the subscription code to be called again, sending the command for a second time.

Is there a simple Rx extension that can handle this scenario for me, or do I need to roll my own?

Upvotes: 1

Views: 82

Answers (1)

Timothy Shields
Timothy Shields

Reputation: 79441

It looks like you want to use a AsyncSubject<T>:

IObservable<Result> SendRequest(Request request)
{
    var subject = new AsyncSubject<Result>();
    NetworkComms.SendReqeust(request, result =>
    {
        subject.OnNext(result);
        subject.OnCompleted();
    });
    return subject.AsObservable();
}

I should add that the interface you desire is a little odd. If the semantics are "The moment I call this method, a request is made, and anybody who wants to can get the response at a later time," then consider just using Task<Result> instead of IObservable<Result>.

Task<Result> SendRequestAsync(Request request)
{
    var tcs = new TaskCompletionSource<Result>();
    NetworkComms.SendReqeust(request, result => tcs.SetResult(result));
    return tcs.Task;
}

Furthermore, consider putting this Task<Result> SendRequestAsync(Request request) method directly on your NetworkComms class.

Upvotes: 4

Related Questions