Reputation: 730
I have 5 different API calls to make, and they all are chained right now in forkJoin. My new requirement is the subscribe should fire anytime any new observable solves.
Is there any operator or any other trick in rxjs where I can keep the chaining but, it should fire each time any observable solves?
forkJoin(
this.myService.api1(),
this.myService.api2(),
this.myService.api3(),
this.myService.api4(),
this.myService.api5()
)
.subscribe(
([r1,r2,r3,r4,r5]) => { ... do something })
Upvotes: 7
Views: 1528
Reputation: 14099
You can use merge
to execute your observables simultaneously like forkJoin but emit their values immediately. To keep track of the order add the index of the observable to its output with map
. Use scan
to keep track of previous values, insert current values at the right position in the array and emit the accumulated data.
export function forkJoinEarly(...sources: Observable<any>[]): Observable<any[]> {
return merge(...sources.map((obs, index) => obs.pipe(
// optional: only emit last value like forkJoin
last(),
// add the index of the observable to the output
map(value => ({ index, value }))
))).pipe(
// use scan to keep track of previous values and insert current values
scan((acc, curr) => (acc[curr.index] = curr.value, acc), Array(sources.length).fill(undefined))
);
}
https://stackblitz.com/edit/rxjs-gwch8m
Upvotes: 1
Reputation: 2007
You can use combineLatest
to emit the latest value from each source observable. It won't emit until each source observable emits at least once, so you can use startWith
to provide a starting value:
combineLatest(
this.myService.api1().pipe(startWith(null)),
this.myService.api2().pipe(startWith(null)),
this.myService.api3().pipe(startWith(null)),
this.myService.api4().pipe(startWith(null)),
this.myService.api5().pipe(startWith(null))
)
.subscribe(
([r1,r2,r3,r4,r5]) => { ... do something })
The initial output will be [null, null, null, null, null]
. When each observable emits, it will replace the corresponding null
value in the array.
If you want to ignore the initial emission, you can use skip(1)
.
const sourceOne = of('Hello').pipe(delay(1000));
const sourceTwo = of('World!').pipe(delay(2000));
const sourceThree = of('Goodbye').pipe(delay(3000));
const sourceFour = of('World!').pipe(delay(4000));
//wait until all observables have emitted a value then emit all as an array
const example = combineLatest(
sourceOne.pipe(startWith(null)),
sourceTwo.pipe(startWith(null)),
sourceThree.pipe(startWith(null)),
sourceFour.pipe(startWith(null))
)
.pipe(skip(1));
//output:
//["Hello", null, null, null]
//["Hello", "World!", null null]
//["Hello", "World!", "Goodbye", null]
//["Hello", "World!", "Goodbye", "World!"]
//Complete
const subscribe = example.subscribe(val => console.log(val), null, () => console.log('Complete'));
And here's a StackBlitz to try it out.
Upvotes: 1