Reputation: 347
Using p-queue, I can limit how many times I start some asynchronous action (for instance, API requests) in a period of time, and how many of these asynchronous actions can run at once.
It works great, but I feel like I should be able to do the same thing with RxJS. I'm having trouble figuring out how to do so. I'm still pretty new to RxJS, and I have yet to find any examples that do what I'm trying to do.
I see operators such as buffer
and throttleTime
, and these seem like the way to go, but I'm having difficulty putting all of this information together.
How would I replicate a p-queue configuration of:
{
concurrency: 2 /* at a time */
, intervalCap: 10 /* per every… */
, interval: ( 15 /* seconds */ * 1000 /* milliseconds */)
, carryoverConcurrencyCount: true
}
…using RxJS?
An RxJS solution should:
carryoverConcurrencyCount
: "…the task must finish in the given interval or will be carried over into the next interval count."Full example using p-queue:
// Queue/Concurrency-limit requests
const PQueue = require('p-queue') ;
const requestQueue = new PQueue({
concurrency: 2 /* at a time */
, intervalCap: 10 /* per every… */
, interval: ( 15 /* seconds */ * 1000 /* milliseconds */)
, carryoverConcurrencyCount: true
}) ;
// From https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Math/random
function getRandomInt(min, max) {
min = Math.ceil(min);
max = Math.floor(max);
return Math.floor(Math.random() * (max - min)) + min; //The maximum is exclusive and the minimum is inclusive
}
const queuePromises = (
[...(Array(20)).keys()]
.map(number => requestQueue.add(() => new Promise(
(resolve, reject) => setTimeout(() => resolve(number), getRandomInt(0, /* up to */ 250) /* milliseconds */))
))
) ;
queuePromises.forEach(queuePromise => queuePromise.then(
number => console.log(number, 'resolved')
, error => console.error('Individual Promise error', error)
)) ;
Promise.all(queuePromises).then(
numbers => console.log('all are resolved', ...numbers)
, error => console.error('All Promises error', error)
) ;
Upvotes: 1
Views: 561
Reputation: 17752
I do not know p-queue but probably you could look at mergeMap
operator to accomplish what you want, and in particular to the concurrency
parameter of mergeMap
. Via concurrency
parameter you can define how many parallel executions you can run concurrently.
So the code, starting from your example, could be something like this
const concurrency = 1;
function getRandomInt(min, max) {
min = Math.ceil(min);
max = Math.floor(max);
return Math.floor(Math.random() * (max - min)) + min; //The maximum is exclusive and the minimum is inclusive
}
const queuePromises = (
[...(Array(20)).keys()]
.map(number => new Promise(
(resolve, reject) => setTimeout(() => resolve(number), getRandomInt(0, /* up to */ 250) /* milliseconds */)))
) ;
from(queuePromises).pipe(
mergeMap(qp => from(qp), concurrency)
)
.subscribe(
number => console.log(number, 'resolved')
, error => console.error('Individual Promise error', error),
() => console.log('all are resolved')
)
Setting the value of concurrency
to 1 allows you to see that actually you have the results of the promises arriving sequentially in an ordered way.
Upvotes: 1