thalesmello
thalesmello

Reputation: 3439

Limit number of requests at a time with RxJS

Suppose I have 10 urls, and I wish to make an HTTP request for each one of them.

I could create an observable of the URLs, then .flatMap() the requests for each one of them, and then .subscribe for the results. But that would make all of the requests at once.

Is there a way to impose a limit to the number of requests to a fixed number, in order not to overload the server

Upvotes: 18

Views: 14410

Answers (5)

Eylon Sultan
Eylon Sultan

Reputation: 1036

RxJS v6 update

pipe through mergeMap with your parallel limit as 2nd parameter

const MAX_PARALLEL_QUERIES = 3;
let allResults = [];
let observables = [] // fill with observables
from(observables)
            .pipe(mergeMap(observable => observable, MAX_PARALLEL_QUERIES))
            .subscribe(
                partialResults => {
                    allResults = allResults.concat(partialResults);
                },
                err => {
                    // handle error
                },
                () => {
                    // get here when all obserable has returned
                    allResults.forEach(result=> {
                        // do what you want
                    });
                }
            );

Upvotes: 16

sra
sra

Reputation: 6408

It's 2018, rxjs 5 is here and this is how I solved it

urls$
  .mergeMap((url) => request({ url }), null, 10)
  .subscribe()

mergeMap (aka flatMap) already takes the "max concurrency" as its 3rd parameter (see the docs)

Btw. I am using universal-rxjs-ajax (the request) for node compatibility, but it should work the same with Observable.ajax

Upvotes: 7

vanugrah
vanugrah

Reputation: 1

Just a side note here: in RxJS v5 fromCallback() and fromNodeCallback() were changed to bindCallback() and bindNodeCallback() respectively. Link for further reading: bindNodeCallback

Upvotes: 0

cyrilluce
cyrilluce

Reputation: 985

I got the same problem, and now found the solution. reference to this answer

If you create observable by fromNodeCallback or return Promise, it create a hot observable, and will execute immediately once flatMap and subscribe. So flatMapWithMaxConcurrent or map&merge not work as expected.

var start = Date.now()
var now = ()=>Date.now()-start
var test = Rx.Observable.fromNodeCallback(function(i, cb){
	console.log(i, 'access', now());
	setTimeout(function(){
		cb(null, i)
	}, 1000);
})

Rx.Observable.from([1,2,3])
	.flatMapWithMaxConcurrent(1, x=>test(x) )
	.subscribe(x=>console.log(x, 'finish', now()))

/* output:
1 access 2
2 access 16
3 access 16
1 finish 1016
2 finish 1016
3 finish 1017
*/
<script src="http://www.cdnjs.net/ajax/libs/rxjs/4.1.0/rx.all.js"></script>

You need convert it to cold observable, simply use Rx.Observable.defer.

var start = Date.now()
var now = ()=>Date.now()-start
var test = Rx.Observable.fromNodeCallback(function(i, cb){
	console.log(i, 'access', now());
	setTimeout(function(){
		cb(null, i)
	}, 1000);
})


Rx.Observable.from([1,2,3])
	.flatMapWithMaxConcurrent(1, x=>Rx.Observable.defer( ()=>test(x)) )
//	.map(x=>Rx.Observable.defer( ()=>test(x)) ).merge(1) // this works too
	.subscribe(x=>console.log(x, 'finish', now()))
    
/* output:
1 access 3
1 finish 1004
2 access 1005
2 finish 2005
3 access 2006
3 finish 3006
*/
<script src="http://www.cdnjs.net/ajax/libs/rxjs/4.1.0/rx.all.js"></script>

Upvotes: 2

user3743222
user3743222

Reputation: 18665

This question has an answer here : how-to-limit-the-concurrency-of-flatmap You can also review the answer here Fire async request in parallel but get result in order using rxjs

Basically it revolves around using the merge(withMaxConcurrency) operator.

Upvotes: 4

Related Questions