Alex
Alex

Reputation: 11137

Using npm async to run in parallel async methods and return a unified response

I have 2 async methods that can run independently one from each other. I would like to call a callback once both are finished. I have tried using async.parallel() (npm) but this seems to be for non async methods. How can I implement this?

Here is my async.parallel call(); note that asyncTasks is my function array, where the functions are async.

async.parallel(asyncTasks, function(resultFinal){
  console.log("--------->>>>> message: "+JSON.stringify(resultFinal));
  console.log("");
  callback(new RequestResponse(true, resultFinal));
});

In short, what I really want is a way to execute multiple async methods in parallel and consider that method finished when the callback provided for that function is triggered.


UPDATE

for a better understanding, I've included the two functions I am using

getGroups

var getGroups = function (callback_async_1) { //get groups + members

    pg.connect(datebasePath, function (err, client, done) {

        var s = squel.select();
        s.from("groups_members");
        s.where("user_id = ?", userId);

        console.log("query: " + s.toString());

        client.query(s.toString(), function (err, result) { //get groups ids in which i am a member
            if (err) {
                console.error("error...1 " + err);
                callback_async_1(responseData);
            } else {
                //  console.log("init -- get from group_members " + JSON.stringify(result.rows));

                var groupIds = [];
                if (result.rows.length > 0) {

                    for (var i = 0; i < result.rows.length; i++) {
                        groupIds.push(result.rows[i].group_id); // create group ids list
                    }
                    //          console.log("group ids : " + groupIds);
                }
                if (groupIds.length === 0) {
                    callback_async_1(responseData);
                }
                var s = squel.select();
                s.from("groups");
                s.where("id IN ?", groupIds);

                client.query(s.toString(), function (err, result2) { // retrieve all the groups in which i take part
                    if (err) {
                        console.error("error...2 " + err);
                        callback_async_1(responseData);
                        return;
                    } else {
                        // var groupIds2 = [];

                        // console.log("init -- get from groups " + JSON.stringify(result2.rows));
                        var groups = [];
                        //   var groups_members = [];
                        for (var i = 0; i < result2.rows.length; i++) {
                            groups.push(result2.rows[i]); // adding group info to list
                            //   var groupId = result2.rows[i].id;
                            //  groupIds2.push(groupId);
                        }
                        // console.log("");
                        //console.log(" ------->>>> " + JSON.stringify(groups));
                        // console.log("");
                        //  responseData.groups = groups;
                        responseData.push({ //pushing groups into response
                            "groups": groups
                        });
                        var s = squel.select();
                        s.from("groups_members");
                        s.where("group_id IN ?", groupIds);

                        client.query(s.toString(), function (err, result3) { // get all the members in my groups

                            //console.log("get from group_members --- " + JSON.stringify(result3.rows));
                            var groupMembers = [];
                            for (var i = 0; i < result3.rows.length; i++) {
                                groupMembers.push({
                                    groupMember: result3.rows[i] // pushing all the group members
                                });
                            }
                            //console.log("");
                            // console.log(" ------->>>> " + JSON.stringify(groupMembers));
                            // console.log("");
                            responseData.push({
                                "groupsMembers": groupMembers
                            });

                            // console.log("resulting json till now; Groups : " + JSON.stringify(responseData));
                            //fetching now events

                            var s = squel.select();
                            s.from("events");
                            s.where("group_id IN ?", groupIds);

                            client.query(s.toString(), function (err, result4) { //selecting all events that have my groups
                                if (err) {
                                    console.error("error...3 " + err);
                                    callback_async_1(responseData);
                                    return;
                                } else {
                                    var events = [];
                                    for (var i = 0; i < result4.rows.length; i++) {
                                        events.push(result4.rows[i]);
                                    }
                                    // responseData.events = events;
                                    responseData.push({
                                        "events": events
                                    });
                                    //responseData.push (events);

                                    callback_async_1(responseData);
                                    // asyncTasks[1](callback);
                                }
                            });
                        });
                    }
                });
            }
        });
        done();
    });
};

getRegisteredContacts

var getRegisteredContacts = function (callback_async_2) { // get registered contacts

    pg.connect(datebasePath, function (err, client, done) {

        //get contacts that are registered
        var s = squel.select();
        s.from("users");
        s.where("phone_number IN ?", arrayOfContacts);

        client.query(s.toString(), function (err, result5) { // retriving registered contacts -- should be run with async parallel, it does not depend on the other calls
            if (err) {
                console.error(err);
                callback_async_2(responseData);
            } else {
                if (result5.rows.length > 0) {
                    var contacts = [];
                    for (var i = 0; i < result5.rows.length; i++) {
                        contacts.push(result5.rows[i]);
                    }
                    responseData.push({
                        "registeredContacts": contacts
                    });
                }

                //console.log("");
                //console.log(" ------->>>> " + JSON.stringify(events));
                // console.log("");
                // console.log("final ---> " + JSON.stringify(responseData));
                callback_async_2(responseData);
            }
        });
        done();
    });
};

Upvotes: 0

Views: 1576

Answers (2)

Christopher Hackett
Christopher Hackett

Reputation: 6192

Based on your new code listing the most obvious thing is done() is called too early for both of your tasks. It needs to be like

var getRegisteredContacts = function (callback_async_2) {
    pg.connect(datebasePath, function (err, client, done) {
        var s = squel.select();
        s.from("users");
        s.where("phone_number IN ?", arrayOfContacts);
        client.query(s.toString(), function (err, result5) {
            done(); // <---- done() to be here 
            if (err) {
                //
            } else {
                //
            }
            callback_async_2();
        });
    });
};

You should also lint your code. If you had you would have noticed that you had not checked if there was an err for callback pg.connect (and also keep it nicer to read correctly)

Upvotes: 0

Christopher Hackett
Christopher Hackett

Reputation: 6192

You need your task function to take a parameter which you then call when the task is done

var task = function(callback){
    console.log('Task');
    callback(null);
};

When you are then doing something async within the task then your task would look like

var task = function(callback){
    console.log('Task');
    request.get('http://www.google.com', function (error, response, body){
        console.log('Task - ' + response.statusCode);
        callback(null);
    });
};

Example

var async = require('async');
var request = require('request');

var task1 = function(callback){
    console.log('Task 1');
    callback(null);
};
var task2 = function(callback){
    console.log('Task 2');
    request.get('http://www.google.com', function (error, response, body){
        console.log('Task 2 - ' + response.statusCode);
        callback(null);
    });
};
var asyncTasks = [task1, task2];

async.parallel(asyncTasks, function(err, result){
    console.log('--DONE--');
});

Outputs

Task 1
Task 2
Task 2 - 200
--DONE--

Upvotes: 2

Related Questions