MonkeyMagiic
MonkeyMagiic

Reputation: 138

Monitor observable without being a subscriber

Observables, I am becoming a big fan and my stack is starting to show it.

I would like to build some monitoring tools to hook into *n number of observables and be able to identify the number of current subscriptions without being a subscriber myself.

Peaking inside some of the Rx visualisation libraries I have noticed they monkey patch many of the operators, seems overkill?

Upvotes: 1

Views: 716

Answers (2)

Dmytro Buryak
Dmytro Buryak

Reputation: 368

Not sure whether these side-effect operators from doOnXXX family are available for RxJS. Anyway, here's the idea how this can be achieved in RxJava for single observable:

    final Subject<Upd, Upd> monitorSubj = ReplaySubject.create();

    Observable.just("one", "two")
        // ....
        .doOnSubscribe(() -> monitorSubj.onNext(Upd.SUBSCRIBE))
        .doOnUnsubscribe(() -> monitorSubj.onNext(Upd.UNSUBSCRIBE))
        // ....
        .subscribe(System.out::println);

    monitorSubj.subscribe(new Subscriber<Upd>() {

        private int num = 0;


        @Override
        public void onNext(final Upd upd) {
            switch (upd) {
                case SUBSCRIBE:
                    num++;
                    break;
                case UNSUBSCRIBE:
                    num--;
                    break;
            }
            System.out.println("subscribers change : num = " + num);
        }

        @Override
        public void onCompleted() {
            // ...
        }

        @Override
        public void onError(Throwable e) {
            // ...
        }

    });

Also other doOnXXX operators may be useful for your task.

Of course, you can do the same without monitorSubj, just call sorresponding update handling code directly. But you should consider thread safety of that handling code since it likely be called by multiple threads.

private int num = 0;
private final synchronized void handleUpd(final Upd upd) {
    switch (upd) {
        case SUBSCRIBE:
            num++;
            break;
        case UNSUBSCRIBE:
            num--;
            break;
    }
    System.out.println("subscribers change : num = " + num);
}
public void run() {
    Observable.just("one", "two")
        // ....
        .doOnSubscribe(() -> handleUpd(Upd.SUBSCRIBE))
        .doOnUnsubscribe(() -> handleUpd(Upd.UNSUBSCRIBE))
        // ....
        .subscribe(System.out::println);
}

Upvotes: 0

paulpdaniels
paulpdaniels

Reputation: 18663

You could create an operator that would do it I suppose (disclaimer: untested):

Rx.Observable.prototype.activeSubscriptions = (subscriptionObserver) => {

  var source = this, subscriberCount = 0;
  return Rx.Observable.create((observer) => {
    var sharedSource = source.publish();

    var d1 = sharedSource.subscribe(observer);
    var d2 = sharedSource.subscribe(
      null,
      (e) => subscriptionObserver.onError(e),
      () =>  subscriptionObserver.onCompleted()
    );
    var d3 = Disposable.create(() => subscriptionObserver.onNext(--subscriberCount));

    var d4 = sharedSource.connect();
    subscriptionObserver.onNext(++subscriberCount);
    return new NaryDisposable([d1, d2, d3, d4]),
  });

};

Then it could be used like so:

var subscriptionObserver = Rx.Observer.create((x) => console.log("Current #: " + x);

Rx.Observable.fromEvent($button, 'click')
  .map(e => /*Map a value*/)
  .flatMap(x => $.get(`search?q=${x.value}`))
  .activeSubscriptions(subscriptionObserver)
  .subscribe();

This is mostly a naive implementation of the approach I would take. It won't be terribly performant (for that you would need to take a more performance oriented approach as some of the other operators are using). One caveat to this approach is that if you use it upstream of a multicast flavor you will only ever see one subscriber, because of how the implementation for those operators is shares subscriptions.

Upvotes: 2

Related Questions