Bill Barry
Bill Barry

Reputation: 3523

rxjs how do I observe a collection of observables that are not completed

I have a bunch of observables (typescript Observable<boolean>) that each emit multiple times and then complete and I want to remove them from a collection when they complete but while they are still active I need to know the latest values from them. If the set wasn't changing I would have something like this:

function all(inputs: Observable<boolean>[]) {
  return combineLatest([...inputs]).pipe(map((values) => values.every(v=>v)));
}

but I need to be able to register new observables and remove ones from the list of inputs when they complete...

class AssentService {
  all(): Observable<boolean> { /* ??? */ }

  register(input: Observable<boolean>) { /* ??? */ }
}

all will only be called once (possibly before any register calls) but register can be called many times and needs to add that input until it completes (ultimately I need a bunch more than just all but I am pretty sure I can figure out the rest of those as soon as I get this one).

Upvotes: 1

Views: 536

Answers (2)

Mrk Sef
Mrk Sef

Reputation: 8022

This is a custom operator that implements combineLatest with a map and just emits an array of the currently running streams each time any of the streams emit.

This is a high-order operator, so you'd use it like you would mergeAll() or concatAll() It expects a stream of observables.

combineLatestAll(){
  return stream$ => defer(() => {
    const register = new Map<number, any>();
    return stream$.pipe(
      mergeMap((inner$: Observable<any>, index: number) => inner$.pipe(
        finalize(() => register.delete(index)),
        map(payload => {
          register.set(index, payload);
          return Array.from(register.values());
        })
      ))
    )
  })
}

You can use this however you like, but this is a simple (if limited) appraoch.

class AssentService {
  private _registered$ = new Subject<Observable<boolean>>();

  all(): Observable<boolean[]> {
    return this._registered$.pipe(
      combineLatestAll()
    )
  }

  register(input: Observable<boolean>) {
    this._registered$.next(input);
  }
}

Why is this limited? This appraoch multicasts register, but you only combineLatestAll() on a call after all(). So if a consumer calls all() first and registers second, this works the way I assume you expect it to.

You could instead cache the most recent result and share it between all subscribers. In this case, init() is any block of code that runs when your service starts up. (before your first calls to all or register)

class AssentService {
  private _registered$ = new Subject<Observable<boolean>>();
  private _cashed$ = _registered$.pipe(
    combineLatestAll(),
    shareReplay(1)
  );

  /*** init(){ ***/
    this._cashed$.subscibe();
  /*** } ***/    

  all(): Observable<boolean[]> {
    return this.cashed$;
  }

  register(input: Observable<boolean>) {
    this._registered$.next(input);
  }
}

Upvotes: 1

martin
martin

Reputation: 96899

You can use mergeAll() to merge higher-order Observables into the chain:

subject
  .pipe(
    mergeAll(),
  )
  .subscribe({
    next: console.log,
    complete: () => console.log('completed')
  });

...

subject.next(timer(1000));
subject.next(timer(1000));
subject.complete();

The complete handler will be called only after all nested Observables complete and when the source subject completes. You shouldn't need to remove completed Observables because thet are unsubscribed automatically.

Live demo: https://stackblitz.com/edit/rxjs-ldjj83?file=index.ts

Upvotes: 0

Related Questions