t.888
t.888

Reputation: 3902

RXJS 5 - Emit a single composite value from an array of observables when all sources emit

I'm composing multiple observables using an observable's pipe method and I'd like to emit a final composite value when all observables in the array emit.

import { apiSvc } from '../lib/api-service'

import { of as observableOf } from 'rxjs/observable/of'
import { map } from 'rxjs/operators'

const uris = [
  '/api/items/1',
  '/api/items/2',
  '/api/items/3'
]

observableOf(uris).pipe(
  // Map uris array to an array of observables.
  map((uris) => calls.map(uri) => apiSvc.get(uri) /* returns observable*/),
  // Perform magic and emit when all calls complete.
  magic()
)
.subscribe((results) => {
  console.log(results) // [{id: 1}, {id: 2}, {id: 3}]
})

I was able to make this work with forkJoin:

import { forkJoin } from 'rxjs/observable/forkJoin'

observableOf(uris).pipe(
  // Map uris array to an array of observables.
  map((uris) => calls.map(uri) => apiSvc.get(uri)),
)
.subscribe((requests) => {
  // Emits when all request observables emit.
  forkJoin(requests).subscribe((results) => {
    console.log(results) // [{id: 1}, {id: 2}, {id: 3}]
  })
})

...but I'm looking for a way to get it done in the pipe chain without having to nest subscribe calls.

The zip operator is sort of in the ballpark but it doesn't appear to work on arrays of observables. Is there a lettable operator that works like forkJoin and can be used with pipe?

Upvotes: 0

Views: 855

Answers (1)

martin
martin

Reputation: 96949

You were very close. You want to return the forkJoined Observable inside the chain and wait until it emits with concatMap (mergeMap will work as well here).

observableOf(uris)
  .pipe(
    // Map uris array to an array of observables.
    concatMap(uris => forkJoin(uris.map(uri => apiSvc.get(uri))),
  )
  .subscribe((responses) => {
    ...
  });

Upvotes: 2

Related Questions