Jonathan Beerhalter
Jonathan Beerhalter

Reputation: 7407

Track the (number of) observers in an Observable?

I have an observable which represents a stream of stock prices. If there are no observers on my observable sequence I'd like to be able to disconnect from the remote server that is supplying the stream of prices, but I don't want to do that until every observer has called Dispose(). Then in a similar fashion, when the first person calls Subscribe I'd like to reconnect to the remote server.

Is there a way to figure out how many observers have called subscribe on an observable? Or perhaps a way to know when observers are calling Subscribe or Dispose?

Upvotes: 8

Views: 4078

Answers (4)

Roland Pheasant
Roland Pheasant

Reputation: 1231

Bit of an old one but I came across this post as I had a problem where I needed to know the number of subscribers. Using Bart's suggestion I came up with this extension.

public static IObservable<T> CountSubscribers<T>(this IObservable<T> source, Action<int> countChanged)
{
 int count = 0;

 return Observable.Defer(() =>
 {
    count = Interlocked.Increment(ref count);
    countChanged(count);
    return source.Finally(() =>
     {
        count = Interlocked.Decrement(ref count);
        countChanged(count);
     });
 });
}

Upvotes: 7

Bart De Smet
Bart De Smet

Reputation: 786

In general, don't implement IObservable; typically there's already soemthing in Rx that can help you out, either directly or through composition. If you ever have to implement IObservable, use Observable.Create to do so, in order to get all the guaranteed required for the observer contract etc.

As for your problem - the suggestion of using Publish and RefCount is exactly the composition you're looking for. If you want to count yourself for some reason, use Observable.Defer to intercept subscriptions, possibly with Observable.Finally to intercept sequence terminations. Or, wrap the source with an Observable.Create, forward the observer to the wrapped sequence, and wrap the returned IDisposable with counting logic (using Disposable.Create).

Cheers,

-Bart (Rx team)

Upvotes: 4

Anderson Imes
Anderson Imes

Reputation: 25650

I would simply use RefCount / Publish. I always feel like if I'm implementing IObservable I'm working way too hard.

myColdObservable.Publish().RefCount();

This will make your observable stop pulsing after everyone has disconnected. Here's a sample:

var coldObservable = Observable
    .Interval(TimeSpan.FromSeconds(1))
    .ObserveOn(Scheduler.TaskPool)
    .Select(_ => DoSomething());

var refCountObs = coldObservable.Publish().RefCount();

CompositeDisposable d = new CompositeDisposable();
d.Add(refCountObs.Subscribe(n => Console.WriteLine("First got: " + n)));
d.Add(refCountObs.Subscribe(n => Console.WriteLine("Second got: " + n)));
d.Add(refCountObs.Subscribe(n => Console.WriteLine("Third got: " + n)));

//Wait a bit for work to happen
System.Threading.Thread.Sleep(10000);

//Everyone unsubscribes
d.Dispose();

//Observe that DoSomething is not called.
System.Threading.Thread.Sleep(3000);

This does not cover the case where you actually want to know the number of subscribers, but I think this fits with your requirements of stopping work if there are no subscribers.

Upvotes: 10

Muhammad Hasan Khan
Muhammad Hasan Khan

Reputation: 35126

IObservable<T> is an interface that you can implement. In the Subscribe method of the interface you can keep track of observers by maintaining a list internally.

Following code snippet is from MSDN.

private List<IObserver<Location>> observers;

public IDisposable Subscribe(IObserver<Location> observer) 
{
   if (! observers.Contains(observer)) 
      observers.Add(observer);

   // ------- If observers.Count == 1 create connection. -------

   return new Unsubscriber(observers, observer);
}
private class Unsubscriber : IDisposable
{
   private List<IObserver<Location>>_observers;
   private IObserver<Location> _observer;

   public Unsubscriber(List<IObserver<Location>> observers, IObserver<Location> observer)
   {
      this._observers = observers;
      this._observer = observer;
   }

   public void Dispose()
   {
      if (_observer != null && _observers.Contains(_observer))
         _observers.Remove(_observer);
      // ----------- if observers.Count == 0 close connection -----------
   }
}

Upvotes: 3

Related Questions