Limit concurrent http requests with RxJS and Angular2

I've seen this question asked before but the responses have always been unsatisfying for me, so i'll try and be precise:

I'm using https://www.npmjs.com/package/rxjs and want to queue up a lot of http requests and i want to chain those into other operations like this:

urls$.flatMap(x => fetchUrls(x)).subscribe(x => console.log(x));

url$ being a Observable and fetchUrls being a method returning a new Observable. This would work well, except the first stream comming from urls$ is pretty fast and the fetchUrls method is slow (doing http request/response). I end up with 200+ http requests running at once.

I think leaving 200+ pending http requests on the client would be asking for trouble so i want to somehow pool the requests and i read that it is possible to set the maximum concurrent requests when using mergeMap/flatMap, which is really nice.

I created a playground for the thing here: http://www.webpackbin.com/EkQXtBEwz I have a method called fakeHttpLookup which delays the response by a second and feeding it with a list of 200 urls - if run sequentially would take 200 seconds. With the maxConcurrent set to 5 i guess, best case it would take care of the whole stream in 40 seconds.

The thing is - I can't get it to work. It does take the first 5 but then halts. It never completes the whole stream.

I would expect to see the counter increment with 5 for each second passed, illustrating 5 concurrent http requests at a time.

Can anyone help? I can live with only running one request at a time - i could buffer of 5 of those at a time, but the important thing here is not to queue up all 200 at once - or only taking the first few.

TLDR: I want to be able to queue/pool http requests and the stream must be able to limit the number of requests without guessing at a hard coded wait delay.

Upvotes: 2

Views: 4204

Answers (1)

Mark van Straten
Mark van Straten

Reputation: 9425

.flatMap(), aka .mergeMap() has an optional argument 'concurrency'

mergeMap(project: function(value: T, ?index: number): Observable, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any, concurrent: number)

So its simply a matter of:

function fetch(id) {
  return Rx.Observable.of(id)
    .do(i => console.log(`fetching request ${i}`))
    .delay(2 * 1000);
}

Rx.Observable.range(1, 20)
  .flatMap(id => fetch(id), null, 5)
  .subscribe(console.log);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.3/Rx.js"></script>

to execute 5 concurrent requests

Upvotes: 4

Related Questions