Reputation: 5017
I have a requirement of doing multiple API calls sequentially and using each response in next call. It looks somewhat like this:
from(Object.keys(someObj))
.pipe(
concatMap(key => this.getUsers(key)
)
.subscribe(res => ...)
The above code works ok but I needed a delay so some components can process the data between the API calls. So I did:
from(Object.keys(someObj))
.pipe(
concatMap(key => of(this.getUsers(key)).pipe(delay(1000))
)
.subscribe(res => ...)
This returns a nested observable. How can I subscribe to the inner observable without nesting subscriptions?
I tried to do:
.pipe(
concatMap(key => of(this.getUsers(key)).pipe(delay(1000)),
mergeMap(res => res)
)
but this, results in an incorrect order of responses.
Upvotes: 0
Views: 232
Reputation: 21638
You could use an expand and process the result of each iteration so the next call is not made until the last one is processed.
const { of, expand, switchMap, delay, map, take, skip } = rxjs;
const keys = [1,2,3,4,5,6];
of(keys).pipe(
// Create an accumulator with the result from the last run and the remaining keys
map(keys => ({ keys, result: undefined })),
expand((acc) => getUsers(acc.keys[0]).pipe(map(response => processResponse(response, acc.keys)))),
skip(1), // We are not interested in the first emission with no result yet
map(acc => acc.result), // We only want the result not the whole accumulator
take(keys.length) // Complete the observable chain after processing all the keys
).subscribe(val => {
console.log(val);
});
function processResponse(response, keys) {
// This is where your expensive processing takes place
const result = `Result for ${response}`;
const [, ...remainingKeys] = keys;
return ({ result, keys: remainingKeys });
}
function getUsers(key) {
return of(`Response of ${key}`).pipe(
delay(Math.random() * 800) // Simulate a call that takes some time
);
}
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/7.8.0/rxjs.umd.min.js" integrity="sha512-v0/YVjBcbjLN6scjmmJN+h86koeB7JhY4/2YeyA5l+rTdtKLv0VbDBNJ32rxJpsaW1QGMd1Z16lsLOSGI38Rbg==" crossorigin="anonymous" referrerpolicy="no-referrer"></script>
Upvotes: 1
Reputation: 17762
I would be suspicious of using delays to give components enough time "to process the data between the api calls". You usually do not know how much time it takes to process the data so you may end up giving too much or too little time.
You should try to see if you can use more of rxjs idiomatic style. What I mean is the following.
Let's assume you have a function processDataBetweenAPICalls(data)
which synchronously process the data received from an API call before calling the following API. In this case you could try something like this
from(Object.keys(someObj))
.pipe(
concatMap(key => this.getUsers(key)),
tap(res => processDataBetweenAPICalls(res)
)
.subscribe(res => ...)
If instead the processing of the data between 2 API calls is not synchronous but rather is an async process, then I would try to turn this processing into an Observable, in other words I would try to create a function processDataBetweenAPICallsObs(data): Observable<any>
. In this case the code could look like
from(Object.keys(someObj))
.pipe(
concatMap(key => this.getUsers(key)),
concatMap(res => processDataBetweenAPICallsObs(res)
)
.subscribe(res => ...)
Not knowing in more details your use case, the above suggestions may be not on target.
EDIT AFTER COMMENT STATING THAT PROCESSING IS SYNCHRONOUS
If the processing is synchronous, as for my understanding your goal should be:
from(Object.keys(someObj))
(or anything similargetUsers
in this case) for each value of the upstreamIf my understanding is correct, then a simple concatMap
should do the work like in this example.
Upvotes: 1