LittleBigFrog
LittleBigFrog

Reputation: 95

Mongodb: use $sample after $group

I have the following data set:

{company:"One",  employee:"John"},
{company:"One",  employee:"Mike"},  
{company:"One",  employee:"Donald"},  
{company:"One",  employee:"Mickey"},  
{company:"Two",  employee:"Johnny"},  
{company:"Two",  employee:"David"}, 

Ideally, I want a query that returns all distinct companies, number of employees for each company, random employee for each company

{Company: "One" , employee_count=4, randomemployee="Donald"},
{Company: "Two" , employee_count=2, randomemployee="David"},

I do find a way to get company and employee_count using aggregate/group However I don't find a way to add the randomemployee with the same query.

My aggregation:

function aggr (collection,cb){
   collection.aggregate(([{$group:{_id:'$company',total:{$sum:1}}},{$sort:{total:-1}}]),function(err, l1){
   cb(null, l1)
})
}

I began an other Sample function:

function onesample (collection,arg,cb){
    collection.aggregate(([{ $match: { "company": arg }},{ $sample: { size: 1 }}]),function(err, item){
   cb(null, item[0].employee)
})
}

But i'm loosing myself with callbacks and loop. Any elegant way to do this within one query?

Thanks a lot.

following your answer, I tried the following code. I have an issue with the callback of async.foreachof, seems it doesn't finish before leaving to next step: any clue?

var async = require("async");   
var MongoClient = require('mongodb').MongoClient;
var assert = require('assert');
var url = 'mongodb://localhost:27017/eyc0';



async.waterfall ([
     function(cb) { 
            MongoClient.connect(url, function(err, db) {
            cb(null,db)
             })
    },
     function (db, cb) {
             db.collection('kodes', function(err, coll) {
            cb(null,db,coll)
             })
    },
      function (db,coll, cb) {
                 var pipeline = [
                {"$group": {"_id": "$ouat","total": { "$sum": 1}}},
                {"$sort":{"total":-1} },
                {"$project":{"_id": 0,"total":1,"company": "$_id"}}];

                coll.aggregate(pipeline).toArray(function(err, dlist){
                cb(null,db,coll,dlist)
                })
    },
        function (db,coll,dlist, cb) {
            // console.log(dlist)
            cb(null,db,coll,dlist)
    },
          function (db,coll,dlist, cb) {
             var dlist2 = []
                async.forEachOf( 
                    dlist,
                    function(item, key, cb){
                        var pipeline = [{ "$match": { "ouat": item.company } },{ "$sample": { size: 1 } }];
                        coll.aggregate(pipeline, function (err, data) {
                        item["randref"] = data[0].code;
                        console.log(item.company)
                        dlist2.push(item)
                        cb()
                    });

                    }
                );
                 cb(null,db,coll,dlist,dlist2);

    },
        function (db,coll,dlist,dlist2, cb) {
            console.log(dlist2)
            console.log(dlist)
    },
    ])

Upvotes: 3

Views: 1168

Answers (2)

RafaelCaballero
RafaelCaballero

Reputation: 1613

db.comp.aggregate([
{$group:{_id:'$company',emp:{$addToSet:'$employee'}}},
{$project:{emp:1,employee_count:{'$size':'$emp'}, 
                 randomvalue:{'$literal':Math.random()}}},
{$project:{emp:1,employee_count:1,
           randomposition:{'$floor':
                    {'$multiply':['$randomvalue', '$employee_count']}}}},
{$project:{'Company':'$_id', _id:0, employee_count:1,  
            randomemployee:{'$arrayElemAt':['$emp','$randomposition']}}},
{$sort:{Company:1}} ])

Seems to work!

A couple of results:

{ "employee_count" : 4, "Company" : "One", "randomemployee" : "Mike" }
{ "employee_count" : 2, "Company" : "Two", "randomemployee" : "Johnny" }

{ "employee_count" : 4, "Company" : "One", "randomemployee" : "Mickey" }
{ "employee_count" : 2, "Company" : "Two", "randomemployee" : "David" }

Upvotes: 2

chridam
chridam

Reputation: 103435

There's one approach that involves one query, it could be close but not as performant (as it uses $unwind) and won't give you the desired result (only the filtered company):

var pipeline = [
    {
        "$group": {
            "_id": "$company",
            "total": { "$sum": 1 },
            "employees": { "$push": "$employee" }
        }
    },
    {
        "$project": {
            "_id": 0,
            "company": "$_id",
            "employee_count": "$total"
            "randomemployee": "$employees"
        }
    },
    { "$unwind": "$randomemployee" },
    { "$match": { "company": arg } },
    { "$sample": { size: 1 } }
];
collection.aggregate(pipeline, function(err, result){
    console.log(result);
});

However, for a solution that uses callbacks from multiple queries, this can be handled easily with use of async module.

To get all distinct companies, number of employees for each company, random employee for each company consider using the async.waterfall() function where the first task returns the aggregation results with all distinct companies and number of employees for each company.

The second task uses the results from taks 1 above to iterate over using async.forEachOf(). This allows you to perform an asynchronous task for each item, and when they're all done do something else. With each document from the array, run the aggregation operation that uses the $sample operator to get a random document with the specified company. With each result, create an extra field with the random employee and push that to an array with the final results that you can access at the end of each task.

Below shows this approach:

var async = require("async");    
async.waterfall([

        // Load full aggregation results (won't be called before task 1's "task callback" has been called)
        function(callback) {
            var pipeline = [
                {
                    "$group": {
                        "_id": "$company",
                        "total": { "$sum": 1 }                      
                    }
                },
                {
                    "$project": {
                        "_id": 0,
                        "company": "$_id",                      
                        "employee_count": "total"
                    }
                }
            ];
            collection.aggregate(pipeline, function(err, results){
                if (err) return callback(err);
                callback(results);
            });
        },

        // Load random employee for each of the aggregated results in task 1
        function(results, callback) {
            var docs = []
            async.forEachOf(                
                results,
                function(value, key, callback) {                    
                    var pipeline = [                                
                        { "$match": { "company": value.company } },
                        { "$sample": { size: 1 } }
                    ];
                    collection.aggregate(pipeline, function (err, data) {
                        if (err) return callback(err);
                        value["randomemployee"] = data[0].employee;
                        docs.push(value);
                        callback();
                    });
                },
                function(err)
                    callback(null, docs);
                }
            );
        },
    ], function(err, result) { 
        if (err) return next(err);            
        console.log(JSON.stringify(result, null, 4));
    }
);

With the the async.series() function, this is useful if you need to execute a set of async functions in a certain order.

Consider the following approach if you wish to get the all the distinct companies and their employee count as one result and the other random employee as another:

var async = require("async"),
    locals = {},
    company = "One";
async.series([
        // Load random company
        function(callback) {
            var pipeline = [                                
                { "$match": { "company": company } },
                { "$sample": { size: 1 } }
            ];
            collection.aggregate(pipeline, function(err, result){
                if (err) return callback(err);                
                locals.randomcompany = result[0];
                callback();
            });            
        },
        // Load full aggregation results (won't be called before task 1's "task callback" has been called)
        function(callback) {
            var pipeline = [
                {
                    "$group": {
                        "_id": "$company",
                        "total": { "$sum": 1 }                      
                    }
                },
                {
                    "$project": {
                        "_id": 0,
                        "company": "$_id",                      
                        "employee_count": "total"
                    }
                }
            ];
            collection.aggregate(pipeline, function(err, result){
                if (err) return callback(err);                
                locals.aggregation = result;
                callback();
            });
        }
    ], function(err) { //This function gets called after the two tasks have called their "task callbacks"
        if (err) return next(err);
        //Here locals will be populated with 'randomcompany' and 'aggregation'
        console.log(JSON.stringify(locals, null, 4));
    }
);

Upvotes: 2

Related Questions