tbarbot
tbarbot

Reputation: 257

RxJS : how to transform an Observable<Observable<Thing>> into Observable<Thing[]>

I'm not an expert with rxJS pipes, i have a problem I think is simple, my code boils down to :

getThings = (): Observable<Thing[]> => 
    this.getThingIds().pipe(
        mergeMap((thingIds: number[]) => 
            thingIds.map((id: number) => this.http.get<Thing>(`url${id}`))
        )
    );

The problem is that this returns an Observable<Observable<Thing>>. Is there a way to transform this into the needed Observable<Thing[]> with an operator ? or am I completely wrong from the beginning.

Basically I need to do one request for each ids received from getThingIds and just transform all thoses results into an array

Upvotes: 1

Views: 184

Answers (2)

Joshua McCarthy
Joshua McCarthy

Reputation: 1852

Wanted to give a solution that's a bit more performant. While forkJoin() does accomplish this, you subscriber must wait until all Things have been fetched before emitting data.

The following solution accomplishes the same thing, but gives you an array that grows as each HTTP request is completed.

getThings$ = this.getThingIds$.pipe(
  switchMap((ids:number[])=>
    from(ids).pipe(
      mergeMap(id=>this.http.get<Thing>(`url${id}`)),
      scan((things, thing)=>[...things, thing], [] as Thing[])
    )
  )
);

Here's the line-by-line

  • We start with switchMap() instead of mergeMap(). Merge maps create a queue while switch maps will interrupt any in progress http requests. If we get a new array of ids, we don't want to wait for all old ids to resolve first.
  • Next we use from() which will create an observable that emits each value of our id array.
  • Now we setup our http queue using mergeMap().
  • Next up is the scan() operator, which is like Array.reduce() but triggers the inner function for every value emitted from the previous line (the HTTP queue). We use a spread operator to add each new Thing to our accumulated response.

Note: I changed the methods to variables since none of them required any parameters. It just removes the tiniest bit of boilerplate.

Upvotes: 0

eko
eko

Reputation: 40647

I don't think you're far off. You can combine them with combineLatest or forkJoin --e.g:

getThings = (): Observable<Thing[]> => 
    this.getThingIds().pipe(
        mergeMap((thingIds: number[]) => {
            const things$ = [];
            thingIds.forEach((id: number) => things$.push(this.http.get<Thing>(`url${id}`)));
            return forkJoin(things$);
        })
    );

Upvotes: 3

Related Questions