Reputation: 9425
I have an Observable which emits Observables. I want to emit the current count of not finished Observable emissions
So given this marble diagram:
--a-----b-------c------
a-------------------------|
b----X
c---------|
___________________
--1-----2----1--2---------1-0
How would I do this? Counting new emissions is fairly easy but error and completion ?
Upvotes: 3
Views: 2319
Reputation: 9425
based on the suggestion of @chromate and other answers on SO i came up with the following solution:
Rx.Observable.prototype.streamLifecycleCounter = function () {
const _this = this; // reference to our upstream observable
return Rx.Observable.create(observer => {
observer.onNext(1);
return _this.subscribe(
() => {}, /* not interested in the actual values */
err => {
observer.onNext(-1);
observer.onCompleted();
},
() => {
observer.onNext(-1);
observer.onCompleted();
}
);
});
};
const scheduler = new Rx.TestScheduler();
const results = scheduler.startScheduler(
() => {
return Rx.Observable.range(1,5)
.map(i => Rx.Observable.just(i).delay(i * 1000, scheduler))
.flatMap(obs => obs.streamLifecycleCounter())
.startWith(0)
.scan((acc, curr) => acc += curr, 0)
.do(console.log);
},
{ disposed: 15000 }
);
This results in the following console output:
0 1 2 3 4 5 4 3 2 1 0
Upvotes: 1
Reputation: 2296
You can create a Subject that will be used by all other observables to tell about their current state. You can then use this Subject to reduce the state to the number of currently active Observables.
See this jsbin example.
const meta$ = new Rx.Subject();
// Create some streams that we can observe.
const stream1$ = Rx.Observable.interval(300);
const stream2$ = Rx.Observable.timer(1000)
.switchMap(() => Rx.Observable.interval(500).startWith(0))
.take(10);
const stream3$ = Rx.Observable.timer(1500)
.switchMap(() => Rx.Observable.interval(500).startWith(0))
.take(2);
stream1$.subscribe(
next => meta$.next({ stream1: true }),
() => meta$.next({ stream1: false }),
() => meta$.next({ stream1: false })
);
stream2$.subscribe(
next => meta$.next({ stream2: true }),
() => meta$.next({ stream2: false }),
() => meta$.next({ stream2: false })
);
stream3$.subscribe(
next => meta$.next({ stream3: true }),
() => meta$.next({ stream3: false }),
() => meta$.next({ stream3: false })
);
meta$
.scan((state, next) => {
return Object.assign({}, state, next);
})
.map(obj => {
return Object.keys(obj)
.map(key => +obj[key])
.reduce((acc, x) => acc + x, 0)
})
.take(50) // don't create endless loop
.timestamp()
.subscribe(x => console.log(x));
Upvotes: 2