Reputation: 138
Observables, I am becoming a big fan and my stack is starting to show it.
I would like to build some monitoring tools to hook into *n number of observables and be able to identify the number of current subscriptions without being a subscriber myself.
Peaking inside some of the Rx visualisation libraries I have noticed they monkey patch many of the operators, seems overkill?
Upvotes: 1
Views: 716
Reputation: 368
Not sure whether these side-effect operators from doOnXXX
family are available for RxJS. Anyway, here's the idea how this can be achieved in RxJava for single observable:
final Subject<Upd, Upd> monitorSubj = ReplaySubject.create();
Observable.just("one", "two")
// ....
.doOnSubscribe(() -> monitorSubj.onNext(Upd.SUBSCRIBE))
.doOnUnsubscribe(() -> monitorSubj.onNext(Upd.UNSUBSCRIBE))
// ....
.subscribe(System.out::println);
monitorSubj.subscribe(new Subscriber<Upd>() {
private int num = 0;
@Override
public void onNext(final Upd upd) {
switch (upd) {
case SUBSCRIBE:
num++;
break;
case UNSUBSCRIBE:
num--;
break;
}
System.out.println("subscribers change : num = " + num);
}
@Override
public void onCompleted() {
// ...
}
@Override
public void onError(Throwable e) {
// ...
}
});
Also other doOnXXX
operators may be useful for your task.
Of course, you can do the same without monitorSubj
, just call sorresponding update handling code directly. But you should consider thread safety of that handling code since it likely be called by multiple threads.
private int num = 0;
private final synchronized void handleUpd(final Upd upd) {
switch (upd) {
case SUBSCRIBE:
num++;
break;
case UNSUBSCRIBE:
num--;
break;
}
System.out.println("subscribers change : num = " + num);
}
public void run() {
Observable.just("one", "two")
// ....
.doOnSubscribe(() -> handleUpd(Upd.SUBSCRIBE))
.doOnUnsubscribe(() -> handleUpd(Upd.UNSUBSCRIBE))
// ....
.subscribe(System.out::println);
}
Upvotes: 0
Reputation: 18663
You could create an operator that would do it I suppose (disclaimer: untested):
Rx.Observable.prototype.activeSubscriptions = (subscriptionObserver) => {
var source = this, subscriberCount = 0;
return Rx.Observable.create((observer) => {
var sharedSource = source.publish();
var d1 = sharedSource.subscribe(observer);
var d2 = sharedSource.subscribe(
null,
(e) => subscriptionObserver.onError(e),
() => subscriptionObserver.onCompleted()
);
var d3 = Disposable.create(() => subscriptionObserver.onNext(--subscriberCount));
var d4 = sharedSource.connect();
subscriptionObserver.onNext(++subscriberCount);
return new NaryDisposable([d1, d2, d3, d4]),
});
};
Then it could be used like so:
var subscriptionObserver = Rx.Observer.create((x) => console.log("Current #: " + x);
Rx.Observable.fromEvent($button, 'click')
.map(e => /*Map a value*/)
.flatMap(x => $.get(`search?q=${x.value}`))
.activeSubscriptions(subscriptionObserver)
.subscribe();
This is mostly a naive implementation of the approach I would take. It won't be terribly performant (for that you would need to take a more performance oriented approach as some of the other operators are using). One caveat to this approach is that if you use it upstream of a multicast
flavor you will only ever see one subscriber, because of how the implementation for those operators is shares subscriptions.
Upvotes: 2