Sam Kennedy
Sam Kennedy

Reputation: 95

NodeJS Batch Mongo operation, wait for callbacks

I'm trying to do a batch operation on a mongo database. The idea was to iterate over each user, and then find other users who are studying the same course, or go to the same university, and store information about these matches.

Everything is contained within a loop like this:

User.find({}, function(err, doc){
    doc.forEach(function(candidate){
        //other find operations in here
        ...
    }
}

Where 'User' is the collection of users registered on the site. The problem I'm having is that the forEach loop is dispatching all the callbacks for every user, whereas I want to wait for all the callbacks within the forEach loop to complete, before moving onto the next document.

I've tried using async, but I just can't seem to figure this out.

How can I process each user one at a time?

Upvotes: 2

Views: 307

Answers (3)

chridam
chridam

Reputation: 103365

This task can be easily done without the need for dispatching all the callbacks for each user, use the $lookup pipeline in an aggregate operation where you create a "self-join" on the users collection, filter the documents using a $match pipeline to return only those users who share the same course with other users.

For example, the following will return all users who are studying the same course, given there is a field called course:

User.aggregate([
    { "$match": { "course": { "$exists": true } } },
    {
        "$lookup": {
            "from": "users",
            "localField": "course",
            "foreignField": "course",
            "as": "users_courses"
        }
    },
    { "$match": { "users_courses.1": { "$exists": true } } }
], callback);

Testing in mongo shell

db.test.insert([
    { "name": "a",  "course": "maths" },
    { "name": "b",  "course": "english" },
    { "name": "c",  "course": "maths" },
    { "name": "d",  "course": "science" },
    { "name": "e",  "course": "maths" },
    { "name": "f",  "course": "history" },
    { "name": "g",  "course": "history" }
])

Run aggregate operation

db.test.aggregate([
    { "$match": { "course": { "$exists": true } } },
    {
        "$lookup": {
            "from": "users",
            "localField": "course",
            "foreignField": "course",
            "as": "users_courses"
        }
    },
    { "$match": { "users_courses.1": { "$exists": true } } }
])

Sample Output

/* 1 */
{
    "_id" : ObjectId("58948c0dd04f1bbdbf331ea7"),
    "name" : "a",
    "course" : "maths",
    "users_courses" : [ 
        {
            "_id" : ObjectId("58948c0dd04f1bbdbf331ea7"),
            "name" : "a",
            "course" : "maths"
        }, 
        {
            "_id" : ObjectId("58948c0dd04f1bbdbf331ea9"),
            "name" : "c",
            "course" : "maths"
        }, 
        {
            "_id" : ObjectId("58948c0dd04f1bbdbf331eab"),
            "name" : "e",
            "course" : "maths"
        }
    ]
}

/* 2 */
{
    "_id" : ObjectId("58948c0dd04f1bbdbf331ea9"),
    "name" : "c",
    "course" : "maths",
    "users_courses" : [ 
        {
            "_id" : ObjectId("58948c0dd04f1bbdbf331ea7"),
            "name" : "a",
            "course" : "maths"
        }, 
        {
            "_id" : ObjectId("58948c0dd04f1bbdbf331ea9"),
            "name" : "c",
            "course" : "maths"
        }, 
        {
            "_id" : ObjectId("58948c0dd04f1bbdbf331eab"),
            "name" : "e",
            "course" : "maths"
        }
    ]
}

/* 3 */
{
    "_id" : ObjectId("58948c0dd04f1bbdbf331eab"),
    "name" : "e",
    "course" : "maths",
    "users_courses" : [ 
        {
            "_id" : ObjectId("58948c0dd04f1bbdbf331ea7"),
            "name" : "a",
            "course" : "maths"
        }, 
        {
            "_id" : ObjectId("58948c0dd04f1bbdbf331ea9"),
            "name" : "c",
            "course" : "maths"
        }, 
        {
            "_id" : ObjectId("58948c0dd04f1bbdbf331eab"),
            "name" : "e",
            "course" : "maths"
        }
    ]
}

/* 4 */
{
    "_id" : ObjectId("58948c0dd04f1bbdbf331eac"),
    "name" : "f",
    "course" : "history",
    "users_courses" : [ 
        {
            "_id" : ObjectId("58948c0dd04f1bbdbf331eac"),
            "name" : "f",
            "course" : "history"
        }, 
        {
            "_id" : ObjectId("58948c0dd04f1bbdbf331ead"),
            "name" : "g",
            "course" : "history"
        }
    ]
}

/* 5 */
{
    "_id" : ObjectId("58948c0dd04f1bbdbf331ead"),
    "name" : "g",
    "course" : "history",
    "users_courses" : [ 
        {
            "_id" : ObjectId("58948c0dd04f1bbdbf331eac"),
            "name" : "f",
            "course" : "history"
        }, 
        {
            "_id" : ObjectId("58948c0dd04f1bbdbf331ead"),
            "name" : "g",
            "course" : "history"
        }
    ]
}

Upvotes: 0

owais
owais

Reputation: 4922

One way to push functions into array and later call them other way is to use observables which is very good for loop async operations.

var candidatesOps = [];
User.find({}, function(err, doc){
    doc.forEach(function(candidate){
    var func =      function(candidate){
        //other find operations in here
      };

         candidatesOps.push(func);

        ...
    }
}

if(candidatesOps){ //call them candidatesOps[0]() }

Upvotes: 0

rsp
rsp

Reputation: 111356

You can use async for that, e.g. async.eachSeries:

async.eachSeries(doc, function (candidate, cb) {
    //other find operations in here
    ...
    // and you call cb() once they're done (important!)
    // or call cb('some error') if it failed 
}, function (err) {
    if (err) {
        // this means that some cb() above was called with error
    } else {
        // here all candidates are processed successfully
    }
});

See: https://caolan.github.io/async/docs.html#eachSeries

Upvotes: 1

Related Questions