Reputation: 7407
I have an observable which represents a stream of stock prices. If there are no observers on my observable sequence I'd like to be able to disconnect from the remote server that is supplying the stream of prices, but I don't want to do that until every observer has called Dispose(). Then in a similar fashion, when the first person calls Subscribe I'd like to reconnect to the remote server.
Is there a way to figure out how many observers have called subscribe on an observable? Or perhaps a way to know when observers are calling Subscribe or Dispose?
Upvotes: 8
Views: 4078
Reputation: 1231
Bit of an old one but I came across this post as I had a problem where I needed to know the number of subscribers. Using Bart's suggestion I came up with this extension.
public static IObservable<T> CountSubscribers<T>(this IObservable<T> source, Action<int> countChanged)
{
int count = 0;
return Observable.Defer(() =>
{
count = Interlocked.Increment(ref count);
countChanged(count);
return source.Finally(() =>
{
count = Interlocked.Decrement(ref count);
countChanged(count);
});
});
}
Upvotes: 7
Reputation: 786
In general, don't implement IObservable; typically there's already soemthing in Rx that can help you out, either directly or through composition. If you ever have to implement IObservable, use Observable.Create to do so, in order to get all the guaranteed required for the observer contract etc.
As for your problem - the suggestion of using Publish and RefCount is exactly the composition you're looking for. If you want to count yourself for some reason, use Observable.Defer to intercept subscriptions, possibly with Observable.Finally to intercept sequence terminations. Or, wrap the source with an Observable.Create, forward the observer to the wrapped sequence, and wrap the returned IDisposable with counting logic (using Disposable.Create).
Cheers,
-Bart (Rx team)
Upvotes: 4
Reputation: 25650
I would simply use RefCount / Publish. I always feel like if I'm implementing IObservable I'm working way too hard.
myColdObservable.Publish().RefCount();
This will make your observable stop pulsing after everyone has disconnected. Here's a sample:
var coldObservable = Observable
.Interval(TimeSpan.FromSeconds(1))
.ObserveOn(Scheduler.TaskPool)
.Select(_ => DoSomething());
var refCountObs = coldObservable.Publish().RefCount();
CompositeDisposable d = new CompositeDisposable();
d.Add(refCountObs.Subscribe(n => Console.WriteLine("First got: " + n)));
d.Add(refCountObs.Subscribe(n => Console.WriteLine("Second got: " + n)));
d.Add(refCountObs.Subscribe(n => Console.WriteLine("Third got: " + n)));
//Wait a bit for work to happen
System.Threading.Thread.Sleep(10000);
//Everyone unsubscribes
d.Dispose();
//Observe that DoSomething is not called.
System.Threading.Thread.Sleep(3000);
This does not cover the case where you actually want to know the number of subscribers, but I think this fits with your requirements of stopping work if there are no subscribers.
Upvotes: 10
Reputation: 35126
IObservable<T>
is an interface that you can implement. In the Subscribe method of the interface you can keep track of observers by maintaining a list internally.
Following code snippet is from MSDN.
private List<IObserver<Location>> observers;
public IDisposable Subscribe(IObserver<Location> observer)
{
if (! observers.Contains(observer))
observers.Add(observer);
// ------- If observers.Count == 1 create connection. -------
return new Unsubscriber(observers, observer);
}
private class Unsubscriber : IDisposable
{
private List<IObserver<Location>>_observers;
private IObserver<Location> _observer;
public Unsubscriber(List<IObserver<Location>> observers, IObserver<Location> observer)
{
this._observers = observers;
this._observer = observer;
}
public void Dispose()
{
if (_observer != null && _observers.Contains(_observer))
_observers.Remove(_observer);
// ----------- if observers.Count == 0 close connection -----------
}
}
Upvotes: 3