William Fortin
William Fortin

Reputation: 777

NodeJS async queue too fast (Slowing down async queue method)

I have an HTTP Get request and I want to parse the response and save it to my database.

If i call crawl(i) independentely i get good results. But i have to call crawl() from 1 to 2000. I get good results but some responses seem to get lost and some responses are duplicates. I don't think I understand how to call thousands of asynchronous functions. I am using the async module queue function but so far I am still missing some data and still have some duplicates. What am I doing wrong here? Thanks for your help.

What i am crawling

My node functions :

 function getOptions(i) {
    return {
        host: 'magicseaweed.com',
        path: '/syndicate/rss/index.php?id='+i+'&unit=uk',
        method: 'GET'
    }
};

function crawl(i){
var req = http.request(getOptions(i), function(res) {
    res.on('data', function (body) {
        parseLocation(body);
    });
});
req.end();

}

function parseLocation(body){
    parser.parseString(body, function(err, result) {
        if(result && typeof result.rss != 'undefined') {
            var locationTitle = result.rss.channel[0].title;
            var locationString = result.rss.channel[0].item[0].link[0];
            var location = new Location({
                id: locationString.split('/')[2],
                name: locationTitle
            });
            location.save();
        }
    });
  }

N = 2 //# of simultaneous tasks
var q = async.queue(function (task, callback) {
        crawl(task.url);
        callback();
}, N);


q.drain = function() {
    console.log('Crawling done.');
}

for(var i = 0; i < 100; i++){
   q.push({url: 'http://magicseaweed.com/syndicate/rss/index.php?id='+i+'&unit=uk'});
}

[EDIT] WELL, after a lot of testing it seems that the service I am crawling cannot handle so many request that fast. Because when I do each requests sequentially, I can get all the good responses.

Is there a way to SLOW DOWN ASYNC queue method?

Upvotes: 5

Views: 21329

Answers (5)

CodeFriendly
CodeFriendly

Reputation: 1

A little late but I have found this works! Using async you can slow down the queue by using whilst inside the task handler eg:

var q = async.priorityQueue(function(task, callback) {
// your code process here for each task
//when ready to complete the task delay it by calling
async.whilst( //wait 6 seconds
   function() {         
        return count < 10;      
   },
   function(callback) {
      count++;
      setTimeout(function() {
        callback(null, count);
      }, 1000);
   },
   function (err, n) {
    // n seconds have passed  
    callback(); //callback to q handler 
   }
 ); //whilst
} , 5);

Upvotes: 0

jmunsch
jmunsch

Reputation: 24089

I know I am a little late to the question, however here is a solution I wrote to slow down the number of requests when testing an api endpoint, using node 4 or node 5:

var fs = require('fs');
var supertest = require('supertest');
var request = supertest("http://sometesturl.com/api/test/v1/")
var Helper = require('./check.helper');
var basicAuth = Helper.basicAuth;
var options = Helper.options;

fs.readFile('test.txt', function(err, data){
  var parsedItems = JSON.parse(data);
  var urlparts = []
  // create a queue
  for (let year of range(1975, 2016)) {
    for (var make in parsedItems[year]){
      console.log(year, make, '/models/' + year + '/' + make)
      urlparts.push({urlpart:'/models/' + year + '/' + make, year: year, make: make})
    }
  }
  // start dequeue
  waitDequeue();

  // This function calls itself after the makeRequest promise completes
  function waitDequeue(){
    var item = urlparts.pop()
    if (item){
      makeRequest(item)
        .then(function(){
          // wait this time before next dequeue
          setTimeout(function() {
            waitDequeue();
          }, 3000);
        })
    } else {
      write(parsedItems)
    }
  }

  // make a request, mutate parsedItems then resolve
  function makeRequest(item){
    return new Promise((resolve, reject)=>{
      request
        .get(item.urlpart)
        .set(options.auth[0], options.auth[1])
        .set(options.type[0], options.type[1])
        .end(function(err, res) {
          if (err) return done1(err);
          console.log(res.body)
          res.body.forEach(function(model){
            parsedItems[item.year][item.make][model] = {}
          });
          resolve()
        })
      })
  }

  // write the results back to the file
  function write(parsedItems){
    fs.writeFile('test.txt', JSON.stringify(parsedItems, null, 4), function(err){
      console.log(err)
    })
  }

})

Upvotes: 0

Lockless
Lockless

Reputation: 497

Another option if you want. Vanilla JS without fancy libraries.

var incrementer = 0;
var resultsArray = [];

var myInterval = setInterval(function() {
    incrementer++
    if(incrementer == 100){
        clearInterval(myInterval)
        //when done parse results array
    }
    //make request here
    //push request result to array here

}, 500);

Invokes the function every half second. Easy way to force sync and exit after x requests.

Upvotes: 1

Aaron Wang
Aaron Wang

Reputation: 1091

var q = async.queue(function (task, callback) {
    crawl(task.url);
    callback();
}, N);

You'are executing next task immediately after starting the previous one, in this way, the queue is just meaningless. You should modify your code like this:

// first, modify your 'crawl' function to take a callback argument, and call this callback after the job is done.

// then
var q = async.queue(function (task, next/* name this argument as 'next' is more meaningful */) {
    crawl(task.url, function () {
        // after this one is done, start next one.
        next();
    });     
    // or, more simple way, crawl(task.url, next);
}, N);

Upvotes: 10

Mustafa
Mustafa

Reputation: 10413

You should have a look at this great module, async which simplifies async tasks like this. You can use queue, simple example:

N = # of simultaneous tasks
var q = async.queue(function (task, callback) {
    somehttprequestfunction(task.url, function(){
    callback();
    } 
}, N);


q.drain = function() {
    console.log('all items have been processed');
}

for(var i = 0; i < 2000; i++){
   q.push({url:"http://somewebsite.com/"+i+"/feed/"});
}

It will have a window of ongoing actions and the tasks room will be available for a future task if you only invoke the callback function. Difference is, your code now opens 2000 connections immidiately and obviously the failure rate is high. Limiting it to a reasonable value, 5,10,20 (depends on site and connection) will result in a better sucess rate. If a request fails, you can always try it again, or push the task to another async queue for another trial. The key point is to invoke callback() in queue function, so that a room will be available when it is done.

Upvotes: 18

Related Questions