mupersan82
mupersan82

Reputation: 567

Batch requests in Node.js

My program is communicating with a web service that only accepts ~10 requests per second. From time to time, my program sends 100+ concurrent requests to the web service, causing my program to crash.

How do I limit concurrent requests in Node.js to 5 per second? Im using the request library.

 // IF EVENT AND SENDER
    if(data.sender[0].events && data.sender[0].events.length > 0) {


        // FIND ALL EVENTS
        for(var i = 0; i < data.sender[0].events.length; i++) {

            // IF TYPE IS "ADDED"
            if(data.sender[0].events[i].type == "added") {
                switch (data.sender[0].events[i].link.rel) {
                    case "contact" :
                        batch("added", data.sender[0].events[i].link.href);
                        //_initContacts(data.sender[0].events[i].link.href);
                        break;
                } 
            // IF TYPE IS "UPDATED"
            } else if(data.sender[0].events[i].type == "updated") {

                switch (data.sender[0].events[i].link.rel){                     
                    case "contactPresence" :
                        batch("updated", data.sender[0].events[i].link.href);
                        //_getContactPresence(data.sender[0].events[i].link.href);
                        break;
                    case "contactNote" :
                        batch("updated", data.sender[0].events[i].link.href);
                        // _getContactNote(data.sender[0].events[i].link.href);
                        break;
                    case "contactLocation" :
                        batch("updated", data.sender[0].events[i].link.href);
                        // _getContactLocation(data.sender[0].events[i].link.href);
                        break;
                    case "presenceSubscription" :
                        batch("updated", data.sender[0].events[i].link.href);
                        // _extendPresenceSubscription(data.sender[0].events[i].link.href);
                        break;
                }
            }
        };

And then the homegrown batch method:

var updated = [];
var added = [];

var batch = function(type, url){
    console.log("batch called");


    if (type === "added"){
        console.log("Added batched");
        added.push(url);
        if (added.length > 5) {
            setTimeout(added.forEach(function(req){
                _initContacts(req);
            }), 2000);
            added = [];
        }
    } 
    else if (type === "updated"){
        console.log("Updated batched");
        updated.push(url);
        console.log("Updated length is : ", updated.length);
        if (updated.length > 5){
            console.log("Over 5 updated events");
            updated.forEach(function(req){
                setTimeout(_getContactLocation(req), 2000);
            });
            updated = [];
        }
    }       
};

And an example of the actual request:

var _getContactLocation = function(url){
    r.get(baseUrl + url, 
    { "strictSSL" : false, "headers" : { "Authorization" : "Bearer " + accessToken }}, 
        function(err, res, body){
            if(err)
                console.log(err);
            else {
                var data = JSON.parse(body);
                self.emit("data.contact", data);
            }
        }
    );
};

Upvotes: 5

Views: 16524

Answers (2)

josh3736
josh3736

Reputation: 145172

This is precisely what node's Agent class is designed to address. Have you done something silly like require('http').globalAgent.maxSockets = Number.MAX_VALUE or passed agent: false as a request option?

With Node's default behavior, your program will not send more than 5 concurrent requests at a time. Additionally, the Agent provides optimizations that a simple queue cannot (namely HTTP keepalives).

If you try to make many requests (for example, issue 100 requests from a loop), the first 5 will begin and the Agent will queue the remaining 95. As requests complete, it starts the next.

What you probably want to do is create an Agent for your web service requests, and pass it in to every call to request (rather than mixing requests in with the global agent).

var http=require('http'), svcAgent = http.Agent();

request({ ... , agent: svcAgent });

Upvotes: 6

George
George

Reputation: 4257

Using the async library, the mapLimit function does exactly what you want. I can't provide an example for your specific use case as you did not provide any code.

From the readme:


mapLimit(arr, limit, iterator, callback)

The same as map only no more than "limit" iterators will be simultaneously running at any time.

Note that the items are not processed in batches, so there is no guarantee that the first "limit" iterator functions will complete before any others are started.

Arguments

  • arr - An array to iterate over.
  • limit - The maximum number of iterators to run at any time.
  • iterator(item, callback) - A function to apply to each item in the array. The iterator is passed a callback(err, transformed) which must be called once it has completed with an error (which can be null) and a transformed item.
  • callback(err, results) - A callback which is called after all the iterator functions have finished, or an error has occurred. Results is an array of the transformed items from the original array.

Example

async.mapLimit(['file1','file2','file3'], 1, fs.stat, function(err, results){ // results is now an array of stats for each file });


EDIT: Now that you provided code, I see that your use is a bit different from what I assumed. The async library is more useful when you know all the tasks to run up front. I don't know of a library off hand that will easily solve this for you. The above note is likely still relevant to people searching this topic so I'll leave it in.

Sorry, I don't have time to restructure your code, but this is an (un-tested) example of a function that makes an asynchronous request while self-throttling itself to 5 requests per second. I would highly recommend working off of this to come up with a more general solution that fits your code base.

var throttledRequest = (function () {
    var queue = [], running = 0;

    function sendPossibleRequests() {
        var url;
        while (queue.length > 0 && running < 5) {
            url = queue.shift();
            running++;
            r.get(url, { /* YOUR OPTIONS HERE*/ }, function (err, res, body) {
                running--;
                sendPossibleRequests();

                if(err)
                    console.log(err);
                else {
                    var data = JSON.parse(body);
                    self.emit("data.contact", data);
                }
            });
        }
    }

    return function (url) {
        queue.push(url);
        sendPossibleRequests();
    };
})();

Basically, you keep a queue of all the data to be asynchronously processed (such as urls to be requested) and then after each callback (from a request) you try to launch off as many remaining requests as possible.

Upvotes: 11

Related Questions