newprogrammer
newprogrammer

Reputation: 620

Rate limiting a queue of API calls and returning the results

I'm looping through an array and making an API call for each member using async/await, I then push the result into another array which is returned.

// My current function
async requestForEach(repos) {
    const result = [];
    for (const repo of repos) {
        result.push(await this.doSomething(repo.name));
    }
    return result;
}

// doSomething()
const AWS = require('aws-sdk');
const codecommit = new AWS.CodeCommit();
async doSomething(repoName){
    return (await codecommit.listBranches({
        repoName
    }).promise()).branches;
}

My issue is I'm getting rate limited. If I catch and print the error I get..

ThrottlingException: Rate exceeded {
  // Call stack here
  code: 'ThrottlingException',
  time: 2020-08-16T15:52:56.632Z,
  requestId: '****-****-****-****-****',
  statusCode: 400,
  retryable: true
}

Documentation for the API I'm using can be found here - https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/CodeCommit.html#listBranches-property

I looked into options and this async library seemed to be the popular option.

Using async.queue()..

Tasks added to the queue are processed in parallel (up to the concurrency limit). If all workers are in progress, the task is queued until one becomes available. Once a worker completes a task, that task's callback is called.

// create a queue object with concurrency 2
var q = async.queue(function(task, callback) {
    console.log('hello ' + task.name);
    callback();
}, 2);

Obviously I cant get the value back from within the callback function, so how should I approach this problem?

Upvotes: 0

Views: 2311

Answers (3)

Prathap Reddy
Prathap Reddy

Reputation: 1749

You can make use of Promise.all as below to reduce the wait time for your API calls as below

async requestForEach(repos) {
  return Promise.all(repos.map(repo => this.doSomething(repo.value)));
}

Since you are getting the rate limit issue with total number of calls, you can make use of libraries like es6-promise-pool to manage concurrent requests (5/10 - based on your requirement).

And update the this.doSomething with recursion and MAX_RETRIES (Control the MAX_RETRIES from environment variable) limit as below

async doSomething(repoName, retries = 0) {
    try {
        const data = await codecommit.listBranches({
            repoName
        }).promise();
        return data.branches;
    } catch(err) {
        if (err.code == 'ThrottlingException' && retries <= MAX_RETRIES) {
            await delay(err.retryDelay ?? 1000); // As per @Bergi's answer
            await doSomething(repoName, retries + 1); // Recursive call
        } else {
            console.log('Issue with repo: ', repoName);
            throw err; // (Or) return ''; based on requirement
        }
    }
}


// Filter out the valid results at the end - Applicable only if you use return '';
const results = await requestForEach(repos);
const finalResults = results.filter(Boolean);

This approach might help you to reduce the wait time in production over looping every request in sequence.

Upvotes: 0

Bergi
Bergi

Reputation: 665276

The sequential for … of loop looks good to me. You can add a default delay for each iteration to make it slower, but you can also simply retry requests later when they fail because of throttling. Notice that this approach only works well when you have only a single source of requests in your app (not multiple concurrent calls to requestForEach), otherwise you'd probably need global coordination.

async doSomething(repoName) {
    while (true) {
        try {
            const data = await codecommit.listBranches({
                repoName
            }).promise();
            return data.branches;
        } catch(err) {
            if (err.code == 'ThrottlingException') { // if (err.retryable) {
                await delay(err.retryDelay ?? 1000);
                continue;
            } else {
                throw err;
            }
        }
    }
}
function delay(time) {
    return new Promise(resolve => {
        setTimeout(resolve, time);
    });
}

Instead of the while (true) loop a recursive approach might look nicer. Notice that in production code you'll want to have a limit on the number of retries so that your loop never runs infinitely.

Upvotes: 2

CountZero
CountZero

Reputation: 6389

Looks like you want parallelLimit.

It takes an optional callback which receives the results.

From the docs.

https://caolan.github.io/async/v3/docs.html#parallelLimit

callback function An optional callback to run once all the functions have completed successfully. This function gets a results array (or object) containing all the result arguments passed to the task callbacks. Invoked with (err, results).

Example:

// run 'my_task' 100 times, with parallel limit of 10

  var my_task = function(callback) { ... };
  var when_done = function(err, results) { ... };

  // create an array of tasks
  var async_queue = Array(100).fill(my_task);

  async.parallelLimit(async_queue, 10, when_done);

Taken from: how to use async.parallelLimit to maximize the amount of (paralle) running processes?

Upvotes: 1

Related Questions