isaac9A
isaac9A

Reputation: 903

mongo aggregation counting unique elements in fields

I am returned documents of the form:

{ _id: '48nmqsyxmswpkkded2ac_331fabf34fcd3935',
  actions: { sales: { pixel: [Object] } },
  date: Sun Jul 27 2014 00:00:00 GMT-0400 (EDT),
  client: '48nmqsyxmswpkkded2ac',
  campaignId: null,
  domain: null,
  affiliate: '964540',
  subaffiliate: '11776-hlc-a-click-here' }

I would like to group by client, and have four fields for the number of unique items that appear in campaignId, domain, affiliate, and subaffiliate. Ideally I would like documents returned in the form:

{ client: '48nmqsyxmswpkkded2ac',
  affiliates: 45,
  subaffiliates: 51,
  campaignIds: 2,
  domains: 234 }

I am trying something like the following:

[       {
            $match:
            {
                date: { $gte:  dutil.getDateOnly(self.date), $lt: dutil.getDateOnly(new Date()) }
            } 
        },
        {
            $group:
            {
                _id: "$client",
                affiliate: { $addToSet : "$affiliate" },
                //subaffiliate: { $addToSet : "$subaffiliate" }
            },
        },

], 
    {
        allowDiskUse: true,
        cursor: { batchSize: 40000 }
    }

Where I can just do affiliate.length after to get the unique elements, however this code doesn't work (no documents are returned) when I uncoment the subaffiliate line because the 66MB BSON object returned is greater than Mongo's 16MB limit.

@Neil Lunn this is the problem now. I have tried changed the cursor limit to 40 or 10 but same error. Even returning as cursor and using each throws same error. I thought the allowdiskuse option would let mongo write to disk and thus no 16MB limit. Am I wrong? Some ideas I have now are

  1. async.forEach $matching to each individual client
  2. some sort of messy conditional using $cond and $if to create a count for each

Both of these ideas seem sub optimal and messy. Any suggestions? Mongo means humongous; isn't it ironic that I am bogged down by a 16MB limit :)

Upvotes: 0

Views: 858

Answers (1)

Neil Lunn
Neil Lunn

Reputation: 151230

If you are really blowing up the BSON limit here, and let's be clear that what you are blowing up is in the "sets" you are creating per grouping document, then you truly have much larger counts than you are indicating in your expected results. At least in some documents that is.

The real catch here is that $addToSet is just not going to cut it for you because of the BSON limit. The big problem now to overcome is that you really don't have another way of accumulating all of the field counts in a single pass at all. Especially considering you need to keep the accumulated documents under the BSON limit.

Bottom line means, separate queries. And then essentially "aggregating" the results from those. What I mean by separate queries is "efficient size" forms that will will just get the count of each distinct field value. So essentially this:

[
    { "$group": {
        "_id": {
            "client": "$client",
            "afilliate": "$affiliate"
        }
    }},
    { "$group": {
        "_id": "$_id.client",
        "affiliates": { "$sum": 1 },
        "subaffiliates": { "$sum": 0 },
        "domains": { "$sum": 0 },
        "campaignids": { "$sum": 0 }
    }}
]

With the basic premise there to be to return the "count" of a particular fields, mainly by finding the "distinct" items in an initial $group using that field as part of the "key". The second grouping gets the counts of the now distinct terms under each key. The full notation there seems contrived, but it does remove the need to test for "null" or non-existent fields in later processing.

Again the catch is with this approach, is that you need to "combine" the results of all the queries in order to determine the final results. The query form as shown cannot possibly breach the BSON limit on documents. So now the only real catch here is somehow "combining" the results from each query and doing it in an efficient manner.

The actual approach here varies depending on where your "number of result documents" either requires a "cursor" to iterate or can otherwise be handled in singular results where each response comes under the 16MB BSON limit.

The two approaches here are either to:

  1. Work with returned cursors and generally create another collection on the server to issue a further aggregate statement against.

  2. If the resulting "sets" come under the 16MB BSON limit then you could look at going the aggregation of the results in memory, which is probably going to be the fastest solution where this is an option.

The general case comes down to either processing the "cursor" like this:

var async = require('async'),
    pluralize = require('pluralize'),
    mongoose = require('mongoose'),
    Schema = mongoose.Schema;

mongoose.connect('mongodb://localhost/test');

var sampleSchema = new Schema({
  date: Date,
  client: String,
  campaignId: Schema.Types.Mixed,
  domain: Schema.Types.Mixed,
  affiliate: String,
  subaffiliate: String
});

var targetSchema = new Schema({},{ strict: false });

var Sample = mongoose.model( 'Sample', sampleSchema, 'sample' );
var Target = mongoose.model( 'Target', targetSchema, 'target' );

function getFields() {
  return Object.keys( Sample.schema.paths ).filter(function(field) {
    var blacklist = ['_id','__v','date','client'];
   return blacklist.indexOf(field) == -1;
  })
}


function getPipe(field,fields) {

  var grp = { "client": "$client" };
  grp[field] = "$" + field;

  var pipe = [{ "$group": { "_id": grp }}];

  var obj = { "_id": "$_id.client" };
  fields.forEach(function(current) {
    var plural = pluralize( current );

    obj[plural] = {
      "$sum": (field == current) ? 1 : 0
    };
  });

  pipe.push({ "$group": obj });
  return pipe;

}

var fields = getFields();
var tasks = {};

fields.forEach(function(current) {
  var pipe = getPipe( current, fields );
  tasks[current] = function(callback) {
    var cursor = Sample.collection.aggregate(
      pipe,
      { cursor: { batchSize: 100 } }
    );

    var bulk = Target.collection.initializeOrderedBulkOp();
    var counter = 0;

    cursor.on("data",function(item) {
      var client  = item._id;
      delete item._id;
      item.client = client;
      console.log(item);
      bulk.insert(item);
      counter++;

      if ( counter % 1000 == 0 )
        bulk.execute(function(err,result) {
          if (err) throw err;
          bulk = Target.collection.initializeOrderdBulkOp();
        });
    });

    cursor.on("end",function() {
      if ( counter % 1000 != 0 )
        bulk.execute(function(err,result) {
          if (err) throw err;
          callback(null,counter);
        });
    });

  };
});

mongoose.connection.on("open",function(err,conn) {

  async.parallel(
    tasks,
    function( err, results ) {
      if (err) throw err;

      console.log( results );

      var obj = { "_id": "$client" };
      fields.forEach(function(field) {
        var plural = pluralize( field );
        obj[plural] = { "$sum": "$" + plural };
      });

      var pipe = [{ "$group": obj }];

      var cursor = Target.collection.aggregate(
        pipe,
        { cursor: { batchSize: 100 } }
      );

      // do something with the cursor, like pipe or other event process

    }
  );

});

Or if the results allow then in memory:

var sampleSchema = new Schema({
  date: Date,
  client: String,
  campaignId: Schema.Types.Mixed,
  domain: Schema.Types.Mixed,
  affiliate: String,
  subaffiliate: String
});

var Sample = mongoose.model( 'Sample', sampleSchema, 'sample' );

function getFields() {
  return Object.keys( Sample.schema.paths ).filter(function(field) {
    var blacklist = ['_id','__v','date','client'];
   return blacklist.indexOf(field) == -1;
  })
}


function getPipe(field,fields) {

  var grp = { "client": "$client" };
  grp[field] = "$" + field;

  var pipe = [{ "$group": { "_id": grp }}];

  var obj = { "_id": "$_id.client" };
  fields.forEach(function(current) {
    var plural = pluralize( current );

    obj[plural] = {
      "$sum": (field == current) ? 1 : 0
    };
  });

  pipe.push({ "$group": obj });
  return pipe;

}

var fields = getFields();
var tasks = {};

fields.forEach(function(current) {
  var pipe = getPipe( current, fields );
  tasks[current] = function(callback) {
    Sample.collection.aggregate(
      pipe,
      function(err,result) {
        callback(err,result);
      }
    );

  };
});

mongoose.connection.on("open",function(err,conn) {

  async.waterfall(
    [
      function(callback) {
        async.parallel(
          tasks,
          function( err, results ) {
            callback(err,results);
          }
        );
      },
      function(results,callback) {
        async.concat(
          fields,
          function(item,callback) {
            callback(null,results[item]);
          },
          function(err,results) {
            callback(err,results);
          }
        );
      },
      function(results,callback) {
        var obj = {};
        results.forEach(function(item) {
          if ( !obj.hasOwnProperty(item._id) ) {
            var blank = {};
            fields.map(function(field) {
              return pluralize(field);
            }).forEach(function(field) {
              blank[field] = 0;
            });
            obj[item._id] = blank;
          } else {
            fields.map(function(field) {
              return pluralize(field);
            }).forEach(function(field) {
              obj[item._id][field] += item[field];
            });
          }
        });
        callback(null,obj);
      },
      function(results,callback) {
        var results = Object.keys( results ).map(function(id) {
          var obj = { _id: id };
          fields.map(function(field) {
            return pluralize( field );
          }).forEach(function(field) {
            obj[field] = results[id][field];
          });
          return obj;
        });
        callback(null,results);
      }
    ],
    function(err,results) {
      if (err) throw err;
      console.log( results );
    }
  );
});

In both there is a little use of the mongoose library there to at least define a "schema" for the collection being referenced. So there is a little introspection in there in order to determine the "list" of fields to use, but you could just do this with a standard array.

Each case looks at "async.parallel" as a way to execute the statements on that sever fairly in unison and otherwise "combine" the results. It really comes down to how large the result sets actually are, as to whether it is more efficient to process a result via a cursor or even the $out specification and then use something like .eval() to essentially keep everything there as a server side operation. Not the best option as ".eval()" has it's own issues which you should read about from the link provided.

At any rate, the parallel processing approach to combining the results is the only real option I can think of given that:

  1. $addToSet creates far too big a document before it could even be reduced with a $size operator. So the problem has already occurred.

  2. Making the grouping more granular to say a "single day" first would not yield the correct "distinct" counts. And not before trying anything that results in the same problem as 1.

  3. You cannot "conditionally evaluate" distinct values across documents. All aggregation operations work on one document at a time, unless you are deliberately combining the documents, in which case back to problem 1 again.

  4. You cannot try to group all keys at once, as again grouping on the values for multiple fields will result in incorrect distinct counts as they are only "distinct" within certain combinations and not by the single "client" key.

Try the in memory option where as long as the full response is under 16MB this would be the fastest to process.

Upvotes: 2

Related Questions