Reputation: 15196
I've seen this question asked before but the responses have always been unsatisfying for me, so i'll try and be precise:
I'm using https://www.npmjs.com/package/rxjs and want to queue up a lot of http requests and i want to chain those into other operations like this:
urls$.flatMap(x => fetchUrls(x)).subscribe(x => console.log(x));
url$ being a Observable and fetchUrls being a method returning a new Observable. This would work well, except the first stream comming from urls$ is pretty fast and the fetchUrls method is slow (doing http request/response). I end up with 200+ http requests running at once.
I think leaving 200+ pending http requests on the client would be asking for trouble so i want to somehow pool the requests and i read that it is possible to set the maximum concurrent requests when using mergeMap/flatMap, which is really nice.
I created a playground for the thing here: http://www.webpackbin.com/EkQXtBEwz I have a method called fakeHttpLookup which delays the response by a second and feeding it with a list of 200 urls - if run sequentially would take 200 seconds. With the maxConcurrent set to 5 i guess, best case it would take care of the whole stream in 40 seconds.
The thing is - I can't get it to work. It does take the first 5 but then halts. It never completes the whole stream.
I would expect to see the counter increment with 5 for each second passed, illustrating 5 concurrent http requests at a time.
Can anyone help? I can live with only running one request at a time - i could buffer of 5 of those at a time, but the important thing here is not to queue up all 200 at once - or only taking the first few.
TLDR: I want to be able to queue/pool http requests and the stream must be able to limit the number of requests without guessing at a hard coded wait delay.
Upvotes: 2
Views: 4204
Reputation: 9425
.flatMap()
, aka .mergeMap()
has an optional argument 'concurrency'
mergeMap(project: function(value: T, ?index: number): Observable, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any, concurrent: number)
So its simply a matter of:
function fetch(id) {
return Rx.Observable.of(id)
.do(i => console.log(`fetching request ${i}`))
.delay(2 * 1000);
}
Rx.Observable.range(1, 20)
.flatMap(id => fetch(id), null, 5)
.subscribe(console.log);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.3/Rx.js"></script>
to execute 5 concurrent requests
Upvotes: 4