Zhe Hu
Zhe Hu

Reputation: 4007

nodeJS/asyncJS parallel processing a dynamic queue

I have one process generating data files in a folder, about one new file every 10 seconds.

I have another nodeJS watcher, monitoring the directory, as new files coming in.

        const watcher = chokidar.watch(['data_folder']);

        watcher.on('add', (path, stats)=>{
            if (stats && stats.size > 0){
                console.log(path);
                //spawn child_process to do further processing
                spawn_child_process_to_run(path);
            }
        });

The new files are then further processed by child_process, which can take quite a long time to finish.

The question is how to queue the files, so that they can be processed in parallel, without hitting the number limits of nodeJS child process.

Upvotes: 0

Views: 1626

Answers (2)

Zhe Hu
Zhe Hu

Reputation: 4007

With the help of async.queue

var async = require('async');    
var exec = require('child_process').exec;


var q = async.queue(function (path, callback) {
    console.log('hello ' + path);
    exec('ping 127.0.0.1 -n 6 >nul ', (err, stdout, stderr)=>{console.log(stdout);callback()});    //simulate 6sec processing time
   }, 4);


  // assign a callback
  q.drain = function() {
     console.log('all items have been processed');
 }



 q.push([1,2,3,4,5,6,7,8],function(){console.log("done");})

Upvotes: 0

Jagdish Idhate
Jagdish Idhate

Reputation: 7742

You can use async library.

async.cargo will be useful , more info here & here

Creates a cargo object with the specified payload. Tasks added to the cargo will be processed altogether (up to the payload limit). If the worker is in progress, the task is queued until it becomes available. Once the worker has completed some tasks, each callback of those tasks is called. Check out these animations for how cargo and queue work.

While queue passes only one task to one of a group of workers at a time, cargo passes an array of tasks to a single worker, repeating when the worker is finished.

var chokidar = require('chokidar');
var async = require('async')

var cargo = async.cargo(function (tasks, callback) {
    async.map(tasks,function(task,cb){
        console.log('spawn_child_process_to_run(path);',task);
        cb();
    },callback);
}, 2);// Number of tasks in parallel 

const watcher = chokidar.watch(['data_folder']);

watcher.on('add', (path, stats)=>{
    if (stats && stats.size > 0){
        cargo.push(path);//Push payload
    }
});

Upvotes: 1

Related Questions