Reputation: 45
I have an array of objects. For each object I need to trigger an asynchronous request (http call). But I only want to have a certain maximum of requests running at the same time. Also, it would be nice (but not neccessary) if I could have one single synchronization point after all requests finished to execute some code.
I've tried suggestions from:
Limit number of requests at a time with RxJS
How to limit the concurrency of flatMap?
Fire async request in parallel but get result in order using rxjs
and many more... I even tried making my own operators.
Either the answers on those pages are too old to work with my code or I can't figure out how to put everything together so all types fit nicely.
This is what I have so far:
for (const obj of objects) {
this.myService.updateObject(obj).subscribe(value => {
this.anotherService.set(obj);
});
}
EDIT 1: Ok, I think we're getting there! With the answers of Julius and pschild (both seem to work equally) I managed to limit the number of requests. But now it will only fire the first batch of 4 and never fire the rest. So now I have:
const concurrentRequests = 4;
from(objects)
.pipe(
mergeMap(obj => this.myService.updateObject(obj), concurrentRequests),
tap(result => this.anotherService.set(result))
).subscribe();
Am I doing something wrong with the subscribe()
?
Btw: The mergeMap
with resultSelector
parameter is deprecated, so I used mergeMap
without it.
Also, the obj
of the mergeMap
is not visible in the tap
, so I had to use tap
's parameter
EDIT 2:
Make sure your observers complete! (It cost me a whole day)
Upvotes: 2
Views: 4497
Reputation: 3138
You can use the third parameter of mergeMap
to limit the number of concurrent inner subscriptions. Use finalize
to execute something after all requests finished:
const concurrentRequests = 5;
from(objects)
.pipe(
mergeMap(obj => this.myService.updateObject(obj), concurrentRequests),
tap(res => this.anotherService.set(res))),
finalize(() => console.log('Sequence complete'))
);
See the example on Stackblitz.
Upvotes: 4
Reputation: 11000
from(objects).pipe(
bufferCount(10),
concatMap(objs => forkJoin(objs.map(obj =>
this.myService.updateObject(obj).pipe(
tap(value => this.anotherService.set(obj))
)))),
finalize(() => console.log('all requests are done'))
)
Code is not tested, but you get the idea. Let me know if any error or explanation is needed
Upvotes: 2
Reputation: 2590
I had the same issue once. When I tried to load multiple images from server. I had to send http requests one after another. I achieved desired outcome using awaited promise. Here is the sample code:
async ngOnInit() {
for (const number of this.numbers) {
await new Promise(resolve => {
this.http.get(`https://jsonplaceholder.typicode.com/todos/${number}`).subscribe(
data => {
this.responses.push(data);
console.log(data);
resolve();
}
);
});
}
}
Main idea is here to resolve the promise once you get the response. With this technique you can come up with custom logic to execute one method once all the requests finished.
Here is the stackblitz. Open up the console to see it in action. :)
Upvotes: 0