Gabriele Magno
Gabriele Magno

Reputation: 196

Unnesting subscriptions with RXJS

I am spending too much time trying to figure out what is the best solution to fix this nested subscription. I have tried without success mergeMap, flatMap and switchMap. Unfortunately the examples found are not quite what I needed so I end up with just one result, or an undefined or an error. The code to be fixed is:

this.myService.getAll().subscribe(res => {

// res.result is an array of 20 objects
res.result.forEach(m => {
    // for each of them I call 2 different endpoints adding a key with the response
    this.myService.checkFirst(m.id).subscribe(result => {
        m.first = result;
    });
    this.myService.checkSecond(m.id).subscribe(result => {
        m.second = result;
    });
});
// once all subscriptions are fulfilled I would like to return the mapped array
this.dataLength = res.total;
});

Upvotes: 1

Views: 52

Answers (3)

Mrk Sef
Mrk Sef

Reputation: 8022

One of the nice features with RxJS is that you can nest streams as deeply as you like. So if you can construct a stream that will enrich a single object, then you can nest 20 of them that will enrich the entire array.

So for one enriched object, a stream that prints the enriched object to the console might look like this:

const oneObject = getObject();
forkJoin({
  firstResult: this.myService.checkFirst(oneObject.id),
  secondResult: this.myService.checkSecond(oneObject.id)
}).pipe(
  map(({firstResult, secondResult}) => {
    oneObject.first = firstResult;
    oneObject.second = secondResult;
    return oneObject;
  })
).subscribe(
  console.log
);

What does this same thing look like if oneObject is itself returned from an observable? It's the same thing, only now we merge or switch our object into the same stream we created above.

this.myService.getOneObject().pipe(
  mergeMap(oneObject => 
    forkJoin({
      firstResult: this.myService.checkFirst(oneObject.id),
      secondResult: this.myService.checkSecond(oneObject.id)
    }).pipe(
      map(({firstResult, secondResult}) => {
        oneObject.first = firstResult;
        oneObject.second = secondResult;
        return oneObject;
      })
    )
  )
).subscribe(
  console.log
);

Now, there's one step left. To do this all for an entire array of objects. To accomplish this, we need a way to run an array of observables. Fortunately, we have forkJoin - the same operator that we use to run checkFirst and checkSecond concurrently. It can join the whole thing together as well. That might look like this:

this.myService.getAll().pipe(
  map(allRes =>
    allRes.result.map(m => 
      forkJoin({
        first: this.myService.checkFirst(m.id),
        second: this.myService.checkSecond(m.id)
      }).pipe(
        map(({first, second}) => {
          m.first = first;
          m.second = second;
          return m;
        })
      )
    )
  ),
  // forkJoin our array of streams, so that your 40 service calls (20 for 
  // checkFirst and 20 for checkSecond) are all combined into a single stream.
  mergeMap(mArr => forkJoin(mArr)),
).subscribe(resultArr => {
  // resultArr is an aray of length 20, with objects enriched with a .first
  // and a .second
  // Lets log the result for he first object our array.
  console.log(resultArr[0].first, resultArr[0].second)
});

Here's the same solution where I've collapsed our map and mergeMap into a single mergeMap:

this.myService.getAll().pipe(
  mergeMap(allRes =>
    forkJoin(allRes.result.map(m => 
      forkJoin({
        first: this.myService.checkFirst(m.id),
        second: this.myService.checkSecond(m.id)
      }).pipe(
        map(({first, second}) => {
          m.first = first;
          m.second = second;
          return m;
        })
      )
    ))
  )
).subscribe(console.log);

If you're not sure that checkFirst and checkSecond complete, you can zip instead of forkJoin, then unsubscribe with take(1) or first()

this.myService.getAll().pipe(
  mergeMap(allRes =>
    forkJoin(allRes.result.map(m => 
      zip(
        this.myService.checkFirst(m.id),
        this.myService.checkSecond(m.id)
      ).pipe(
        first(),
        map(([first, second]) => {
          m.first = first;
          m.second = second;
          return m;
        })
      )
    ))
  )
).subscribe(console.log);

Upvotes: 1

Picci
Picci

Reputation: 17762

If I understand right your problem, I would proceed like this.

I assume this.myService.getAll() is some sort of http call, so what you want is to do something as soon as this calls returns and the related Observable completes. For this the operator to use is concatMap that allows you to work with subsequent Observables as soon as the source one, this.myService.getAll() in this case, completes.

Now, once you have retrieves the result of this.myService.getAll(), you need to issue 2 calls for each item in the array returned. Such calls can run in parallel and each has the side effect of updating some properties of the item.

For running 2 calls in parallel you can use the forkJoin function, which returns an Observable which emits as soon as both calls have completed, and it emits an array with the results of each call. In other words, this code snipped should do the work for a single item

forkJoin([this.myService.checkFirst(m.id), this.myService.checkSecond(m.id)]).pipe(
   tap(([first, second]) => {
     m.first = first;
     m.second = second;
   })
)

Since you have 20 items in your array, you need to run 20 times the above logic, maybe in parallel. If this is the case, you can use again forkJoin to run the above requests for each item in the array.

So, stitching it al together, your solution could look something like this

this.myService.getAll().pipe(
  concatMap(res => {
    // reqestForItems is an array of Observables, each Observable created by calling the forkJoin that allows us to run the 2 calls in parallel
    const reqestForItems = res.result.map(m => 
      forkJoin([this.myService.checkFirst(m.id), this.myService.checkSecond(m.id)]).pipe(
        tap(([first, second]) => {
          m.first = first;
          m.second = second;
        })
      )
    )
    // return the result of the execution of requests for the items
    return forkJoin(reqestForItems).pipe(
      // since what is requested as result is the array with each item enriched with the data retrieved, you return the res object which has been modified by the above logic
      map(() => res)
    )
  })
)
.subscribe(res => // the res.result is an array of item where each item has been enriched with data coming from the service)

If you have to deal with Observbles and http use cases, you may find this article about Observable and http patterns interesting.

Upvotes: 2

MoxxiManagarm
MoxxiManagarm

Reputation: 9124

Try

this.myService.getAll().pipe(
  switchMap(res => {
    const obs$ = res.result.map(m => {
      return this.myService.checkFirst(m.id).pipe(
        map(first => ({...m, first})),
      );
    });
  
    return forkJoin(obs$).pipe(
      map(result => ({...res, result})),
    ),
  }),
  switchMap(res => {
    const obs$ = res.result.map(m => {
      return this.myService.checkSecond(m.id).pipe(
        map(second => ({...m, second})),
      );
    });
  
    return forkJoin(obs$).pipe(
      map(result => ({...res, result})),
    ),
  }),
).subscribe(/* ... */);

Upvotes: 2

Related Questions