Reputation: 6282
I would like to be able to watch multiple observables and perform some action when any of them change. Similar to how zip works, but without needing each observable to change its value. Similarly forkJoin is not suitable as it only fires when all watched observables have fired.
eg. ideally when any observable value one$
, two$
or three$
changes the subscribe function should trigger giving the current state of the Observable. at present I am using BehavourSubject's to the value should be accessible using BehaviourSubject.getValue()
magicCombinator(
one$,
two$,
three$,
).subscribe(([ one, two, three ]) => {
console.log({ one, two, three });
})
Does a combinator like this exist?
Here is a stackblitz with example code if you need.
Current working code that combines the observables into a single stream and caches the results of each stream into BehavourSubjects.
const magicCombinator = <T = any>(...observables: Observable<T>[]) =>
new Observable((subscriber: Subscriber<T>) => {
// convert the given Observables into BehaviourSubjects so we can synchronously get the values
const cache = observables.map(toBehavourSubject);
// combine all the observables into a single stream
merge(...observables)
// map the stream output to the values of the BehavourSubjects
.pipe(map(() => cache.map(item => item.getValue())))
// trigger the combinator Observable
.subscribe(subscriber);
});
Upvotes: 14
Views: 9786
Reputation: 19193
That is what you need
merge
Creates an output Observable which concurrently emits all values from every given input Observable
You can merge all observables together and listen with a single subscription to values emitted from each of them.
I would like to be able to watch multiple observables and perform some action when any of them change. Similar to how zip works, but without needing each observable to change its value.
Obsevables do not change value and do not keep state. They just emit values until they complete and then get closed.
If you want to be able to have all the emitted values at a given time then you should go with ReplaySubject. good article about replay subjects But with that you need to make a new subscription each time you want to retrieve all emitted values again.
Upvotes: 8