synthet1c
synthet1c

Reputation: 6282

rxjs - watch multiple observables, respond when any change

I would like to be able to watch multiple observables and perform some action when any of them change. Similar to how zip works, but without needing each observable to change its value. Similarly forkJoin is not suitable as it only fires when all watched observables have fired.

eg. ideally when any observable value one$, two$ or three$ changes the subscribe function should trigger giving the current state of the Observable. at present I am using BehavourSubject's to the value should be accessible using BehaviourSubject.getValue()

magicCombinator(
  one$,
  two$,
  three$,
).subscribe(([ one, two, three ]) => {
  console.log({ one, two, three });
})

Does a combinator like this exist?

Here is a stackblitz with example code if you need.

Current working code that combines the observables into a single stream and caches the results of each stream into BehavourSubjects.

const magicCombinator = <T = any>(...observables: Observable<T>[]) =>
  new Observable((subscriber: Subscriber<T>) => {
    // convert the given Observables into BehaviourSubjects so we can synchronously get the values
    const cache = observables.map(toBehavourSubject);
    // combine all the observables into a single stream
    merge(...observables)
      // map the stream output to the values of the BehavourSubjects
      .pipe(map(() => cache.map(item => item.getValue())))
      // trigger the combinator Observable
      .subscribe(subscriber);
  });

Upvotes: 14

Views: 9786

Answers (1)

Panagiotis Bougioukos
Panagiotis Bougioukos

Reputation: 19193

That is what you need

merge

Creates an output Observable which concurrently emits all values from every given input Observable

see merge doc

You can merge all observables together and listen with a single subscription to values emitted from each of them.

I would like to be able to watch multiple observables and perform some action when any of them change. Similar to how zip works, but without needing each observable to change its value.

Obsevables do not change value and do not keep state. They just emit values until they complete and then get closed.

If you want to be able to have all the emitted values at a given time then you should go with ReplaySubject. good article about replay subjects But with that you need to make a new subscription each time you want to retrieve all emitted values again.

Upvotes: 8

Related Questions