Mark van Straten
Mark van Straten

Reputation: 9425

How to count not finished Observable<Observable> emissions

I have an Observable which emits Observables. I want to emit the current count of not finished Observable emissions

So given this marble diagram:

--a-----b-------c------
  a-------------------------|
        b----X
                c---------|
___________________
--1-----2----1--2---------1-0

How would I do this? Counting new emissions is fairly easy but error and completion ?

Upvotes: 3

Views: 2319

Answers (2)

Mark van Straten
Mark van Straten

Reputation: 9425

based on the suggestion of @chromate and other answers on SO i came up with the following solution:

Rx.Observable.prototype.streamLifecycleCounter = function () {
  const _this = this; // reference to our upstream observable
  return Rx.Observable.create(observer => {
    observer.onNext(1);

    return _this.subscribe(
      () => {}, /* not interested in the actual values */
      err => {
        observer.onNext(-1);
        observer.onCompleted();
      },
      () => {
        observer.onNext(-1);
        observer.onCompleted();
      }
    );
  });
};


const scheduler = new Rx.TestScheduler(); 
const results = scheduler.startScheduler(
  () => {
    return Rx.Observable.range(1,5)
      .map(i => Rx.Observable.just(i).delay(i * 1000, scheduler))
      .flatMap(obs => obs.streamLifecycleCounter())
      .startWith(0)
      .scan((acc, curr) => acc += curr, 0)
      .do(console.log);
  },
  { disposed: 15000 }
);

This results in the following console output:

0 1 2 3 4 5 4 3 2 1 0

Upvotes: 1

dotcs
dotcs

Reputation: 2296

You can create a Subject that will be used by all other observables to tell about their current state. You can then use this Subject to reduce the state to the number of currently active Observables.

See this jsbin example.

const meta$ = new Rx.Subject();

// Create some streams that we can observe.
const stream1$ = Rx.Observable.interval(300);
const stream2$ = Rx.Observable.timer(1000)
  .switchMap(() => Rx.Observable.interval(500).startWith(0))
  .take(10);
const stream3$ = Rx.Observable.timer(1500)
  .switchMap(() => Rx.Observable.interval(500).startWith(0))
  .take(2);


stream1$.subscribe(
  next => meta$.next({ stream1: true }),
  () => meta$.next({ stream1: false }),
  () => meta$.next({ stream1: false })
);
stream2$.subscribe(
  next => meta$.next({ stream2: true }),
  () => meta$.next({ stream2: false }),
  () => meta$.next({ stream2: false })
);
stream3$.subscribe(
  next => meta$.next({ stream3: true }),
  () => meta$.next({ stream3: false }),
  () => meta$.next({ stream3: false })
);

meta$
  .scan((state, next) => {
    return Object.assign({}, state, next);
  })
  .map(obj => {
    return Object.keys(obj)
      .map(key => +obj[key])
      .reduce((acc, x) => acc + x, 0)
  })
  .take(50) // don't create endless loop
  .timestamp()
  .subscribe(x => console.log(x));

Upvotes: 2

Related Questions