Reputation: 185
I am trying to create an observable that meets the following requirements:
1) When the first client subscribes then the observable needs to connect to some backend service, and push out an initial value
2) When successive clients subscribe then the observable should push out a new value
3) When the final client disposes then the observable should disconnect from the backend system.
4) The backend service also calls OnNext regularly with other messages
So far I have something like below. I can't work out how I can react to each subscribe call but only call the disposer on the final dispose.
var o = Observable.Create((IObserver<IModelBrokerEvent> observer) =>
{
observer.OnNext(newValue);
_backendThingy.Subscribe(observer.OnNext);
return Disposable.Create(() =>
{
_backendThingy.Unsubscribe(observer.OnNext);
});
}
_observable = Observable.Defer(() => o).Publish().RefCount();
Upvotes: 1
Views: 180
Reputation: 18125
There are several ways to do things similar to what you are talking about, but without exact semantics I am limited to proving a few generic solutions...
The simplest is as such:
var source = ...;
var n = 1
var shared = source.Replay(n).RefCount();
The Replay operator ensures that each new subscription receives the latest n
values from the source Observable. However, it does not re-invoke any subscription logic to the source Observable to achieve this. In effect, assuming the source stream has emitted values already, subsequent subscriptions will receive the previous n
values synchronously upon subscription. RefCount does what you might think it should do: Connect the Replay upon the first subscription, and dispose of the connection upon the last unsubscribe.
Replay solves the most common use case, in that the source stream is capable of keeping itself up-to-date relatively well. However, if the source stream is updated only periodically, and new subscriptions should constitute an immediate update, then you may want to get more fancy:
var service = ...;
var source = service.Publish().RefCount();
var forceUpdate = Observable.Defer(() => Observable.Start(service.PingForUpdate));
var shared = Observable.Merge(source, forceUpdate);
Where the subscription to the server constitutes a new connection, and the PingForUpdate
method indicates to the service that it's consumer would like to have a new value ASAP (which then forces the service to output said value.
Using the bidirectional communication method denotes that all consumers of this service will receive the latest value from the source upon any new subscription. However, it may be possible that we only want the latest value for the new subscriber, and all other consumers should receive values on the periodic basis.
For this, we can change the code a bit.
var service = ...;
var source = service.Publish().RefCount();
var latest = Observable.Defer(() => service.GetLatestAsObservable());
var shared = Observable.Merge(source, forceUpdate);
Where the subscription to the server constitutes a new connection, and the GetLatestAsObservable
method simply retrieves the latest value from the service asynchronously via an Observable.
However, with this method you must also choose how to handle race conditions, such that you request the latest value, but a newer value is yielded before the request for the latest value is returned.
Upvotes: 3