Aijaz
Aijaz

Reputation: 730

Hasty forkjoin alternative rxjs for observable chaining?

I have 5 different API calls to make, and they all are chained right now in forkJoin. My new requirement is the subscribe should fire anytime any new observable solves.

Is there any operator or any other trick in rxjs where I can keep the chaining but, it should fire each time any observable solves?

forkJoin(
        this.myService.api1(),
        this.myService.api2(),
        this.myService.api3(),
        this.myService.api4(),
        this.myService.api5()
    )
        .subscribe(
            ([r1,r2,r3,r4,r5]) => { ... do something })

Upvotes: 7

Views: 1528

Answers (2)

frido
frido

Reputation: 14099

You can use merge to execute your observables simultaneously like forkJoin but emit their values immediately. To keep track of the order add the index of the observable to its output with map. Use scan to keep track of previous values, insert current values at the right position in the array and emit the accumulated data.

export function forkJoinEarly(...sources: Observable<any>[]): Observable<any[]> {
  return merge(...sources.map((obs, index) => obs.pipe(
    // optional: only emit last value like forkJoin
    last(), 
    // add the index of the observable to the output
    map(value => ({ index, value })) 
  ))).pipe(
    // use scan to keep track of previous values and insert current values
    scan((acc, curr) => (acc[curr.index] = curr.value, acc), Array(sources.length).fill(undefined))
  );
}

https://stackblitz.com/edit/rxjs-gwch8m

Upvotes: 1

alexhughesking
alexhughesking

Reputation: 2007

You can use combineLatest to emit the latest value from each source observable. It won't emit until each source observable emits at least once, so you can use startWith to provide a starting value:

combineLatest(
        this.myService.api1().pipe(startWith(null)),
        this.myService.api2().pipe(startWith(null)),
        this.myService.api3().pipe(startWith(null)),
        this.myService.api4().pipe(startWith(null)),
        this.myService.api5().pipe(startWith(null))
    )
        .subscribe(
            ([r1,r2,r3,r4,r5]) => { ... do something })

The initial output will be [null, null, null, null, null]. When each observable emits, it will replace the corresponding null value in the array.

If you want to ignore the initial emission, you can use skip(1).

const sourceOne = of('Hello').pipe(delay(1000));
const sourceTwo = of('World!').pipe(delay(2000));
const sourceThree = of('Goodbye').pipe(delay(3000));
const sourceFour = of('World!').pipe(delay(4000));

//wait until all observables have emitted a value then emit all as an array
const example = combineLatest(
  sourceOne.pipe(startWith(null)),
  sourceTwo.pipe(startWith(null)),
  sourceThree.pipe(startWith(null)),
  sourceFour.pipe(startWith(null))
)
.pipe(skip(1));

//output:
//["Hello", null, null, null]
//["Hello", "World!", null null]
//["Hello", "World!", "Goodbye", null]
//["Hello", "World!", "Goodbye", "World!"]
//Complete
const subscribe = example.subscribe(val => console.log(val), null, () => console.log('Complete'));

And here's a StackBlitz to try it out.

Upvotes: 1

Related Questions