nzyme
nzyme

Reputation: 185

Can I use RefCount but also react to each subscribe call?

I am trying to create an observable that meets the following requirements:

1) When the first client subscribes then the observable needs to connect to some backend service, and push out an initial value

2) When successive clients subscribe then the observable should push out a new value

3) When the final client disposes then the observable should disconnect from the backend system.

4) The backend service also calls OnNext regularly with other messages

So far I have something like below. I can't work out how I can react to each subscribe call but only call the disposer on the final dispose.

var o = Observable.Create((IObserver<IModelBrokerEvent> observer) =>
{
    observer.OnNext(newValue);
    _backendThingy.Subscribe(observer.OnNext);

    return Disposable.Create(() =>
    {
        _backendThingy.Unsubscribe(observer.OnNext);
    });
}

_observable = Observable.Defer(() => o).Publish().RefCount();

Upvotes: 1

Views: 180

Answers (1)

cwharris
cwharris

Reputation: 18125

There are several ways to do things similar to what you are talking about, but without exact semantics I am limited to proving a few generic solutions...

Replay Subject

The simplest is as such:

var source = ...;
var n = 1
var shared = source.Replay(n).RefCount();

The Replay operator ensures that each new subscription receives the latest n values from the source Observable. However, it does not re-invoke any subscription logic to the source Observable to achieve this. In effect, assuming the source stream has emitted values already, subsequent subscriptions will receive the previous n values synchronously upon subscription. RefCount does what you might think it should do: Connect the Replay upon the first subscription, and dispose of the connection upon the last unsubscribe.

Bidirection Communication via Proxy

Replay solves the most common use case, in that the source stream is capable of keeping itself up-to-date relatively well. However, if the source stream is updated only periodically, and new subscriptions should constitute an immediate update, then you may want to get more fancy:

var service = ...;
var source = service.Publish().RefCount();
var forceUpdate = Observable.Defer(() => Observable.Start(service.PingForUpdate));
var shared = Observable.Merge(source, forceUpdate);

Where the subscription to the server constitutes a new connection, and the PingForUpdate method indicates to the service that it's consumer would like to have a new value ASAP (which then forces the service to output said value.

Merging Periodic Updates with Initial Latest Value

Using the bidirectional communication method denotes that all consumers of this service will receive the latest value from the source upon any new subscription. However, it may be possible that we only want the latest value for the new subscriber, and all other consumers should receive values on the periodic basis.

For this, we can change the code a bit.

var service = ...;
var source = service.Publish().RefCount();
var latest = Observable.Defer(() => service.GetLatestAsObservable());
var shared = Observable.Merge(source, forceUpdate);

Where the subscription to the server constitutes a new connection, and the GetLatestAsObservable method simply retrieves the latest value from the service asynchronously via an Observable.

However, with this method you must also choose how to handle race conditions, such that you request the latest value, but a newer value is yielded before the request for the latest value is returned.

Upvotes: 3

Related Questions