zer0
zer0

Reputation: 5017

Avoiding nested subscriptions

I have a requirement of doing multiple API calls sequentially and using each response in next call. It looks somewhat like this:

from(Object.keys(someObj))
  .pipe(
    concatMap(key => this.getUsers(key)
  )
  .subscribe(res => ...)

The above code works ok but I needed a delay so some components can process the data between the API calls. So I did:

from(Object.keys(someObj))
  .pipe(
    concatMap(key => of(this.getUsers(key)).pipe(delay(1000))
   )
  .subscribe(res => ...)

This returns a nested observable. How can I subscribe to the inner observable without nesting subscriptions?

I tried to do:

.pipe(
  concatMap(key => of(this.getUsers(key)).pipe(delay(1000)), 
  mergeMap(res => res)
) 

but this, results in an incorrect order of responses.

Upvotes: 0

Views: 232

Answers (2)

Adrian Brand
Adrian Brand

Reputation: 21638

You could use an expand and process the result of each iteration so the next call is not made until the last one is processed.

const { of, expand, switchMap, delay, map, take, skip } = rxjs;

const keys = [1,2,3,4,5,6];

of(keys).pipe(
  // Create an accumulator with the result from the last run and the remaining keys
  map(keys => ({ keys, result: undefined })),
  expand((acc) => getUsers(acc.keys[0]).pipe(map(response => processResponse(response, acc.keys)))),
  skip(1), // We are not interested in the first emission with no result yet
  map(acc => acc.result), // We only want the result not the whole accumulator
  take(keys.length) // Complete the observable chain after processing all the keys
).subscribe(val => {
  console.log(val);
});

function processResponse(response, keys) {
  // This is where your expensive processing takes place
  const result = `Result for ${response}`;
  const [, ...remainingKeys] = keys;
  return ({ result, keys: remainingKeys });
}

function getUsers(key) {
  return of(`Response of ${key}`).pipe(
    delay(Math.random() * 800) // Simulate a call that takes some time
  );
}
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/7.8.0/rxjs.umd.min.js" integrity="sha512-v0/YVjBcbjLN6scjmmJN+h86koeB7JhY4/2YeyA5l+rTdtKLv0VbDBNJ32rxJpsaW1QGMd1Z16lsLOSGI38Rbg==" crossorigin="anonymous" referrerpolicy="no-referrer"></script>

Upvotes: 1

Picci
Picci

Reputation: 17762

I would be suspicious of using delays to give components enough time "to process the data between the api calls". You usually do not know how much time it takes to process the data so you may end up giving too much or too little time.

You should try to see if you can use more of rxjs idiomatic style. What I mean is the following.

Let's assume you have a function processDataBetweenAPICalls(data) which synchronously process the data received from an API call before calling the following API. In this case you could try something like this

from(Object.keys(someObj))
  .pipe(
    concatMap(key => this.getUsers(key)),
    tap(res => processDataBetweenAPICalls(res)
   )
  .subscribe(res => ...)

If instead the processing of the data between 2 API calls is not synchronous but rather is an async process, then I would try to turn this processing into an Observable, in other words I would try to create a function processDataBetweenAPICallsObs(data): Observable<any>. In this case the code could look like

from(Object.keys(someObj))
  .pipe(
    concatMap(key => this.getUsers(key)),
    concatMap(res => processDataBetweenAPICallsObs(res)
   )
  .subscribe(res => ...)

Not knowing in more details your use case, the above suggestions may be not on target.

EDIT AFTER COMMENT STATING THAT PROCESSING IS SYNCHRONOUS

If the processing is synchronous, as for my understanding your goal should be:

  1. get a value from the stream of values generated by from(Object.keys(someObj)) (or anything similar
  2. call an API (getUsers in this case) for each value of the upstream
  3. do whatever synchronous processing you have to do for with the response returned from the API call
  4. only after the above processing is completed, move to the next value of the source stream and repeat steps 1, 2 and 3 until the source stream completes or somewhere an error occurs

If my understanding is correct, then a simple concatMap should do the work like in this example.

Upvotes: 1

Related Questions