Ankur Chavda
Ankur Chavda

Reputation: 33

Async.eachSeries: Callback already called

This is a little long and messy code, but please bear with me because I need to get this done.

I am trying to update a json object in for each users. I want the loop iterations to wait for the asynchronous process to end in order to avoid race conditions. However, this has led to a callback hell and now I cannot decide what is the right place for each callbacks to be returned.

I referred to this answer on Nesting async.eachSeries and also tried to structure my code according to it. But still it does not work. The code gives callback already called error at callback1().

    async.eachOfSeries(res, function (value, camp, callback3) {
let _id = res[camp]._id;
let arr = res[camp].campaignID;
async.eachOfSeries(arr, function2, function (err) {
    callback3();
})


function function2(value1, i, callback2) {
    let users = arr[i].users;
    let id = arr[i].id;
    let loop = Math.ceil(users / 1000);
    let limit = 0,
        offset = 0;
    for (let j = 0; j < loop; j++) {
        if (users > 1000) {
            limit = 1000;
            users -= limit;
        } else {
            limit = users;
        }
        console.log(limit + " limit " + offset + " offset");
        var start = Date.now();
        while (Date.now() < start + 100) {}
        const request = mailjet
            .get("messagesentstatistics")
            .request({
                "CampaignID": id,
                "AllMessages": true,
                "Limit": limit,
                "Offset": offset
            })
        request
            .then((result) => {
                let data = result.body.Data;
                var loop = 0;
                async.eachOfSeries(data, function1, function (err) {
                    console.log("function");
                    callback2();
                })
                console.log("oooooo");
            })
            .catch((err) => {
                console.log(err);
            })
        offset += limit;
    }

    function function1(value2, val, callback1) {
        console.log(data +" data");
        let jsonObj = data[val];
        let email = jsonObj.ToEmail;
        jsonObj['retailer'] = res[camp].retailer;
        jsonObj['summary'] = 'f';
        let tempObj = {};
        tempObj[id] = jsonObj;
        let options = {
            new: true
        };
        let campId = id;
        User.addCampaignResponse(email, campId, tempObj, options, function (err, results) {
            if (err) {
                throw err;
            } else {
                console.log("aasd");
                Campaign.updateResponse(_id, function (err, results2) {
                    if (err)
                        throw err;
                    else {
                        console.log("asdasaadas");
                        callback1();
                    }
                }) // console.log(results);
            }
        })
    }

}

}, function (err) {
    callback(undefined, "doneeeeee");
})

Is there a better way than this? Can I use waterfall too somewhere? Can I change the callback positions to avoid the error?


EDIT: Simplified code

function function2(value1, i, callback2) {
    // ...
    const request = mailjet
                    .get("messagesentstatistics")
                    .request({
                       // ...
                    });
    request
       .then((result) => {
          // ...
          async.eachOfSeries(data, function1, function (err) {
            callback2();
          });
        })
        .catch((err) => {
          // ...
        });
    }

function function1(value2, val, callback1) {
  // ...
  User.addCampaignResponse(email, campId, tempObj, options, function (err, results) {
    if (err) {
      throw err;
    } else {
      Campaign.updateResponse(_id, function (err, results2) {
        if (err) throw err;
        else callback1();
      });
    }
  });
}

async.eachOfSeries(res, function (value, camp, callback3) {
    // ...

    async.eachOfSeries(arr, function2, function (err) {
      callback3();
    });

  },
  function (err) {
    callback(undefined, "doneeeeee");
  });

Upvotes: 1

Views: 977

Answers (2)

Mikey
Mikey

Reputation: 6766

I would do it like this.

We use if (err) return callback(err); to stop the current async function and send the error to a higher level.

async.eachSeries(res, function (r, callback1) {

    let _id = r._id;
    let arr = r.campaignID;

    async.eachSeries(arr, function firstLevel (a, callback2) {

        let users = a.users;
        let id = a.id;
        let loop = Math.ceil(users / 1000);
        let limit = 0, offset = 0;

        // for loop is synchronous whereas mailjet is asynchronous -> usually bad idea to mix those two
        // instead try async.timesSeries()
        async.timesSeries(loop, function getSentMessages (n, callback3) {

            if (users > 1000) {
                limit = 1000;
                users -= limit;
            } else {
                limit = users;
            }
            console.log(n, limit, "limit", offset, "offset");

            var start = Date.now();
            while (Date.now() < start + 100) {} // this does nothing...

            // async.js doesn't flow well with Promises so request your resource with a callback function
            mailjet
                .get("messagesentstatistics")
                .request({ CampaignID: id, AllMessages: true, Limit: limit, Offset: offset })
                .request(function (err, result, body) {

                // stop everything if an error occurred; send the error back up
                if (err) return callback3(err);

                let data = result.body.Data;
                var loop = 0;

                async.eachSeries(data, secondLevel (jsonObj, callback4) {
                    let email = jsonObj.ToEmail;
                    jsonObj.retailer = r.retailer;
                    jsonObj.summary = 'f';
                    let tempObj = {};
                    tempObj[id] = jsonObj;
                    let options = { new: true };
                    let campId = id;
                    User.addCampaignResponse(email, campId, tempObj, options, function (err, results) {
                        // stop everything if an error occurred; send the error back up
                        if (err) return callback4(err);

                        console.log("added campaign response");

                        Campaign.updateResponse(_id, function (err, results2) {
                            // stop everything if an error occurred; send the error back up
                            if (err) return callback4(err);

                            console.log("updated campaign response");

                            callback4();
                        });
                    })
                }, callback3);

            }); // end of mailjet

            offset += limit;

        }, callback2); // end of async.timesSeries

    }, callback1); // end of async.eachOfSeries

}, function (err) {
    // if an error occurs anywhere, it should back here
    if (err) {
        console.log(err);
        return;
    }
    console.log("doneeeeee");
});

Also it's always better to use meaningful variable and function names.

Upvotes: 1

Orelsanpls
Orelsanpls

Reputation: 23495

Here is the simplified corrected version of it


function secondLevel(value2, val, callback) {
    // ...
    User.addCampaignResponse(email, campId, tempObj, options, function (err, results) {
       if (err) {
          throw err;
       }

       Campaign.updateResponse(_id, function (err, results2) {
          // throw the error to the highest level if there is one
          if (err) throw err;

          // Finish the eachOfSeries if all is ok
          callback();
       });
    });
 }

function firstLevel(value1, i, callback) {
  // ...
  mailjet
    .get("messagesentstatistics")
    .request({
      // ...
    })
    .then((result) => {
      // ...
      async.eachOfSeries(data, secondLevel, function (err) {
        // throw the error to the highest level if there is one
        if (err) throw err;

        // Finish the eachOfSeries if all is ok
        callback();
      });
    })
    .catch((err) => {
      // throw the error to the highest level
      throw err;
    });
}

 //
 // Start of our program
 //
 async.eachOfSeries(res, function (value, camp, callback) {
     // ...
     async.eachOfSeries(arr, firstLevel, function (err) {
       // throw the error to the highest level if there is one
       if (!err) throw err;

       // Finish the eachOfSeries if all is ok
       callback();
     });
 },
 function (err) {
     if (err) {
        // Here we re done with an error
        // ...
        return;
     }

     // We did it, no, error
     // ...
 });

Upvotes: 0

Related Questions