rvrabbit
rvrabbit

Reputation: 347

Queue emitted values using RxJS, like it can be done with p-queue

Using p-queue, I can limit how many times I start some asynchronous action (for instance, API requests) in a period of time, and how many of these asynchronous actions can run at once.

It works great, but I feel like I should be able to do the same thing with RxJS. I'm having trouble figuring out how to do so. I'm still pretty new to RxJS, and I have yet to find any examples that do what I'm trying to do.

I see operators such as buffer and throttleTime, and these seem like the way to go, but I'm having difficulty putting all of this information together.

How would I replicate a p-queue configuration of:

{
    concurrency:    2 /* at a time */
    , intervalCap: 10 /* per every… */
    , interval: (  15 /* seconds */ * 1000 /* milliseconds */)
    , carryoverConcurrencyCount: true
}

…using RxJS?

An RxJS solution should:


Full example using p-queue:

// Queue/Concurrency-limit requests
const PQueue = require('p-queue') ;
const requestQueue = new PQueue({
    concurrency:    2 /* at a time */
    , intervalCap: 10 /* per every… */
    , interval: (  15 /* seconds */ * 1000 /* milliseconds */)
    , carryoverConcurrencyCount: true
}) ;

// From https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Math/random
function getRandomInt(min, max) {
  min = Math.ceil(min);
  max = Math.floor(max);
  return Math.floor(Math.random() * (max - min)) + min; //The maximum is exclusive and the minimum is inclusive
}

const queuePromises = (
    [...(Array(20)).keys()]
    .map(number => requestQueue.add(() => new Promise(
        (resolve, reject) => setTimeout(() => resolve(number), getRandomInt(0, /* up to */ 250) /* milliseconds */))
    ))
) ;

queuePromises.forEach(queuePromise => queuePromise.then(
    number => console.log(number, 'resolved') 
    , error => console.error('Individual Promise error', error)
)) ;

Promise.all(queuePromises).then(
    numbers => console.log('all are resolved', ...numbers)
    , error => console.error('All Promises error', error)
) ;

Upvotes: 1

Views: 561

Answers (1)

Picci
Picci

Reputation: 17752

I do not know p-queue but probably you could look at mergeMap operator to accomplish what you want, and in particular to the concurrency parameter of mergeMap. Via concurrency parameter you can define how many parallel executions you can run concurrently.

So the code, starting from your example, could be something like this

const concurrency = 1;
function getRandomInt(min, max) {
  min = Math.ceil(min);
  max = Math.floor(max);
  return Math.floor(Math.random() * (max - min)) + min; //The maximum is exclusive and the minimum is inclusive
}

const queuePromises = (
    [...(Array(20)).keys()]
    .map(number => new Promise(
      (resolve, reject) => setTimeout(() => resolve(number), getRandomInt(0, /* up to */ 250) /* milliseconds */)))
) ;

from(queuePromises).pipe(
  mergeMap(qp => from(qp), concurrency)
)
.subscribe(
  number => console.log(number, 'resolved') 
  , error => console.error('Individual Promise error', error),
  () => console.log('all are resolved')
)

Setting the value of concurrency to 1 allows you to see that actually you have the results of the promises arriving sequentially in an ordered way.

Upvotes: 1

Related Questions