user3652705
user3652705

Reputation: 23

Run HTTP requests in chunks

I want to run 1 thundered http requests in configurable chunks, and set configurable timeout between chunk requests. The request is based on the data provided with some.csv file.

It doesn't work because I am getting a TypeError, but when I remove () after f, it doesn't work either. I would be very grateful for a little help. Probably the biggest problem is that I don't really understand how exactly promises work, but I tried multiple solutions and I wasn't able to achieve what I want.

The timeout feature will probably give me even more headache so I would appreciate any tips for this too.

Can you please help me to understand why it doesn't work?

Here is the snippet:

const rp = require('request-promise');
const fs = require('fs');
const { chunk } = require('lodash');

const BATCH_SIZE = 2;
const QUERY_PARAMS = ['clientId', 'time', 'changeTime', 'newValue'];

async function update(id, time, query) {
    const options = {
        method: 'POST',
        uri: `https://requesturl/${id}?query=${query}`,
        body: {
            "prop": {
                "time": time
            }
        },
        headers: {
            "Content-Type": "application/json"
        },
        json: true
    }

    return async () => { return await rp(options) };
}

async function batchRequestRunner(data) {
    const promises = [];
    for (row of data) {
        row = row.split(',');
        promises.push(update(row[0], row[1], QUERY_PARAMS.join(',')));
    }
    const batches = chunk(promises, BATCH_SIZE);

    for (let batch of batches) {
        try {
            Promise.all(
                batch.map(async f => { return await f();})
            ).then((resp) => console.log(resp));
        } catch (e) {
            console.log(e);
        }
    }
}

async function main() {
    const input = fs.readFileSync('./input.test.csv').toString().split("\n");
    const requestData = input.slice(1);

    await batchRequestRunner(requestData);
}

main();

Clarification for the first comment:

I have a csv file which looks like below:

clientId,startTime
123,13:40:00
321,13:50:00

the file size is ~100k rows the file contains information how to update time for a particular clientId in the database. I don't have an access to the database but I have access to an API which allows to update entries in the database. I cannot make 100k calls at once, because: my network is limited (I work remotely because of coronavirus), it comsumpts a lot of memory, and API can also be limited and can crash if I will make all the requests at once.

What I want to achieve:

Upvotes: 2

Views: 7565

Answers (1)

jfriend00
jfriend00

Reputation: 707238

Well, it seems like this is a somewhat classic case of where you want to process an array of values with some asynchronous operation and to avoid consuming too many resources or overwhelming the target server, you want to have no more than N requests in-flight at the same time. This is a common problem for which there are pre-built solutions for. My goto solution is a small piece of code called mapConcurrent(). It's analagous to array.map(), but it assumes a promise-returning asynchronous callback and you pass it the max number of items that should ever be in-flight at the same time. It then returns to you a promise that resolves to an array of results.

Here's mapConcurrent():

// takes an array of items and a function that returns a promise
// returns a promise that resolves to an array of results
function mapConcurrent(items, maxConcurrent, fn) {
    let index = 0;
    let inFlightCntr = 0;
    let doneCntr = 0;
    let results = new Array(items.length);
    let stop = false;

    return new Promise(function(resolve, reject) {

        function runNext() {
            let i = index;
            ++inFlightCntr;
            fn(items[index], index++).then(function(val) {
                ++doneCntr;
                --inFlightCntr;
                results[i] = val;
                run();
            }, function(err) {
                // set flag so we don't launch any more requests
                stop = true;
                reject(err);
            });
        }

        function run() {
            // launch as many as we're allowed to
            while (!stop && inflightCntr < maxConcurrent && index < items.length) {
                runNext();
            }
            // if all are done, then resolve parent promise with results
            if (doneCntr === items.length) {
                resolve(results);
            }
        }

        run();
    });
}

Your code can then be structured to use it like this:

function update(id, time, query) {
    const options = {
        method: 'POST',
        uri: `https://requesturl/${id}?query=${query}`,
        body: {
            "prop": {
                "time": time
            }
        },
        headers: {
            "Content-Type": "application/json"
        },
        json: true
    }
    return rp(options);
}

function processRow(row) {
    let rowData = row.split(",");
    return update(rowData[0], rowData[1], rowData[2]);
}


function main() {
    const input = fs.readFileSync('./input.test.csv').toString().split("\n");
    const requestData = input.slice(1);    

    // process this entire array with up to 5 requests "in-flight" at the same time
    mapConcurrent(requestData, 5, processRow).then(results => {
        console.log(results);
    }).catch(err => {
        console.log(err);
    });
}

You can obviously adjust the number of concurrent requests to whatever number you want. I set it to 5 here in this example.

Upvotes: 2

Related Questions