Jinesh
Jinesh

Reputation: 95

Count number of items dynamically through date range

Currently, I am dealing with a large set of data that is stored in MongoDB(2M single collections from a larger collection of 20M). Feilds: id, item name, item type, item description and date()

Dynamically count how many items occur in a date span of a week and month for the entire collection. i.e From 2014-01-01 to 2014-01-07 has 20 items, From 2014-01-08 to 2014-01-16 has 50 items,.. etc

Using python, how can I accomplish this? Are their libraries for this or it would be a custom code?

Alternatively, should this be done all through MongoDB?

Upvotes: 1

Views: 688

Answers (1)

Neil Lunn
Neil Lunn

Reputation: 151132

The general way is of course to have the database handle aggregation. If you want data from within a "week range" then there are a couple of ways to go about it, just depending on which approach you actually want for your case.

Group by ISO Week

Just demonstrating for the "month of May" by example, then you would have something like:

startdate = datetime(2018,5,1)
enddate = datetime(2018,6,1)

result = db.sales.aggregate([
  { '$match': { 'date': { '$gte': startdate, '$lt': enddate } } },
  { '$group': {
    '_id': {
      'year': { '$year': '$date' },
      'week': { '$isoWeek': '$date' }
    },
    'totalQty': { '$sum': '$qty' },
    'count': { '$sum': 1 }
  }},
  { '$sort': { '_id': 1 } }
])

It's a fairly simple invocation using the $year and $isoWeek or possibly even the $week operators depending on what your MongoDB version actually supports. All you need do is specify these within the _id grouping key of $group and then choose other accumulators like $sum depending on what you actually need to "accumulate" within that grouping.

The $week and $isoWeek just come out slightly different where the latter is more aligned with functions of the isoweek library for Python and similar things for other languages. In general you can probably just adjust the week with the two by adding 1. See the documentation for more detail.

In this case you can optionally just let the database do the "aggregation" work and then get your "desired dates" based on the output. i.e For python you can transform the result with datetime values corresponding to each week as:

result = list(result)
for item in result:
  item.update({
    'start': datetime.combine(
      Week(item['_id']['year'],item['_id']['week']).monday(),
      datetime.min.time()
    ),
    'end': datetime.combine(
      Week(item['_id']['year'],item['_id']['week']).sunday(),
      datetime.max.time()
    )
  })
  item.pop('_id',None)

Group by self defined

If sticking to the ISO standards is not for you, then the alternate approach is to define your own "intervals" on which to accumulate the "grouping" for. The main tool with MongoDB here is $bucket, and a little list processing beforehand:

cuts = [startdate]
date = startdate

while ( date < enddate ):
  date = date + timedelta(days=7)
  if ( date > enddate ):
    date = enddate
  cuts.append(date)

alternate = db.sales.aggregate([
  { '$match': { 'date': { '$gte': startdate, '$lt': enddate } } },
  { '$bucket': {
    'groupBy': '$date',
    'boundaries': cuts,
    'output': {
      'totalQty': { '$sum': '$qty' },
      'count': { '$sum': 1 }
    }
  }},
  { '$project': {
    '_id': 0,
    'start': '$_id',
    'end': {
      '$cond': {
        'if': {
          '$gt': [
            { '$add': ['$_id', (1000 * 60 * 60 * 24 * 7) - 1] },
            enddate
          ]
        },
        'then': { '$add': [ enddate, -1 ] },
        'else': {
          '$add': ['$_id', (1000 * 60 * 60 * 24 * 7) - 1]
        }
      }
    },
    'totalQty': 1,
    'count': 1
  }}
])

Instead of using defined functions such as $week or $isoWeek, instead we work out the "intervals of 7 days" from a given query start date and produce an array of those intervals, always of course ending with the "maximum" value from the range of data being selected.

This list is then given in argument to the $bucket aggregation stage for it's "boundaries" option. This is actually just a list of values which tells the statement what to accumulate "up to" for each "grouping" produced.

The actual statement is really just a "shorthand" implementation of the $switch aggregation operator within a $group pipeline stage. Both of those operators require MongoDB 3.4, but you can actually do the same thing using $cond within a $group but just nesting each else condition for each "boundary" value. It's possible, but just a bit more involved and you really should be using MongoDB 3.4 as a minimum version by now anyway.

If you find you really must though, using $cond within a $group is added to the examples below just showing how to essentially transform that same cuts list into such a statement and that means you can essentially do the same thing all the way back to MongoDB 2.2 where the aggregation framework was introduced.

Examples

As a full example you can consider the following listing which inserts a month worth of random data and then runs both presented aggregation options over it:

from random import randint
from datetime import datetime, timedelta, date
from isoweek import Week

from pymongo import MongoClient
from bson.json_util import dumps, JSONOptions
import bson.json_util

client = MongoClient()
db = client.test

db.sales.delete_many({})

startdate = datetime(2018,5,1)
enddate = datetime(2018,6,1)

currdate = startdate

batch = []

while ( currdate < enddate ):
  currdate = currdate + timedelta(hours=randint(1,24))
  if ( currdate > enddate ):
    currdate = enddate
  qty = randint(1,100);
  if ( currdate < enddate ):
    batch.append({ 'date': currdate, 'qty': qty })

  if ( len(batch) >= 1000 ):
    db.sales.insert_many(batch)
    batch = []

if ( len(batch) > 0):
  db.sales.insert_many(batch)
  batch = []

result = db.sales.aggregate([
  { '$match': { 'date': { '$gte': startdate, '$lt': enddate } } },
  { '$group': {
    '_id': {
      'year': { '$year': '$date' },
      'week': { '$isoWeek': '$date' }
    },
    'totalQty': { '$sum': '$qty' },
    'count': { '$sum': 1 }
  }},
  { '$sort': { '_id': 1 } }
])

result = list(result)
for item in result:
  item.update({
    'start': datetime.combine(
      Week(item['_id']['year'],item['_id']['week']).monday(),
      datetime.min.time()
    ),
    'end': datetime.combine(
      Week(item['_id']['year'],item['_id']['week']).sunday(),
      datetime.max.time()
    )
  })
  item.pop('_id',None)

print("Week grouping")
print(
  dumps(result,indent=2,
    json_options=JSONOptions(datetime_representation=2)))

cuts = [startdate]
date = startdate

while ( date < enddate ):
  date = date + timedelta(days=7)
  if ( date > enddate ):
    date = enddate
  cuts.append(date)

alternate = db.sales.aggregate([
  { '$match': { 'date': { '$gte': startdate, '$lt': enddate } } },
  { '$bucket': {
    'groupBy': '$date',
    'boundaries': cuts,
    'output': {
      'totalQty': { '$sum': '$qty' },
      'count': { '$sum': 1 }
    }
  }},
  { '$project': {
    '_id': 0,
    'start': '$_id',
    'end': {
      '$cond': {
        'if': {
          '$gt': [
            { '$add': ['$_id', (1000 * 60 * 60 * 24 * 7) - 1] },
            enddate
          ]
        },
        'then': { '$add': [ enddate, -1 ] },
        'else': {
          '$add': ['$_id', (1000 * 60 * 60 * 24 * 7) - 1]
        }
      }
    },
    'totalQty': 1,
    'count': 1
  }}
])

alternate = list(alternate)

print("Bucket grouping")
print(
  dumps(alternate,indent=2,
    json_options=JSONOptions(datetime_representation=2)))

cuts = [startdate]
date = startdate

while ( date < enddate ):
  date = date + timedelta(days=7)
  if ( date > enddate ):
    date = enddate
  if ( date < enddate ):
    cuts.append(date)

stack = []

for i in range(len(cuts)-1,0,-1):
  rec = {
    '$cond': [
      { '$lt': [ '$date', cuts[i] ] },
      cuts[i-1]
    ]
  }

  if ( len(stack) == 0 ):
    rec['$cond'].append(cuts[i])
  else:
    lval = stack.pop()
    rec['$cond'].append(lval)

  stack.append(rec)

pipeline = [
  { '$match': { 'date': { '$gt': startdate, '$lt': enddate } } },
  { '$group': {
    '_id': stack[0],
    'totalQty': { '$sum': '$qty' },
    'count': { '$sum': 1 }
  }},
  { '$sort': { '_id': 1 } },
  { '$project': {
    '_id': 0,
    'start': '$_id',
    'end': {
      '$cond': {
        'if': {
          '$gt': [
            { '$add': [ '$_id', ( 1000 * 60 * 60 * 24 * 7 ) - 1 ] },
            enddate
          ]
        },
        'then': { '$add': [ enddate, -1 ] },
        'else': {
          '$add': [ '$_id', ( 1000 * 60 * 60 * 24 * 7 ) - 1 ]
        }
      }
    },
    'totalQty': 1,
    'count': 1
  }}
]

#print(
#  dumps(pipeline,indent=2,
#    json_options=JSONOptions(datetime_representation=2)))

older = db.sales.aggregate(pipeline)
older = list(older)

print("Cond Group")
print(
  dumps(older,indent=2,
    json_options=JSONOptions(datetime_representation=2)))

With output:

Week grouping
[
  {
    "totalQty": 449,
    "count": 9,
    "start": {
      "$date": "2018-04-30T00:00:00Z"
    },
    "end": {
      "$date": "2018-05-06T23:59:59.999Z"
    }
  },
  {
    "totalQty": 734,
    "count": 14,
    "start": {
      "$date": "2018-05-07T00:00:00Z"
    },
    "end": {
      "$date": "2018-05-13T23:59:59.999Z"
    }
  },
  {
    "totalQty": 686,
    "count": 14,
    "start": {
      "$date": "2018-05-14T00:00:00Z"
    },
    "end": {
      "$date": "2018-05-20T23:59:59.999Z"
    }
  },
  {
    "totalQty": 592,
    "count": 12,
    "start": {
      "$date": "2018-05-21T00:00:00Z"
    },
    "end": {
      "$date": "2018-05-27T23:59:59.999Z"
    }
  },
  {
    "totalQty": 205,
    "count": 6,
    "start": {
      "$date": "2018-05-28T00:00:00Z"
    },
    "end": {
      "$date": "2018-06-03T23:59:59.999Z"
    }
  }
]
Bucket grouping
[
  {
    "totalQty": 489,
    "count": 11,
    "start": {
      "$date": "2018-05-01T00:00:00Z"
    },
    "end": {
      "$date": "2018-05-07T23:59:59.999Z"
    }
  },
  {
    "totalQty": 751,
    "count": 13,
    "start": {
      "$date": "2018-05-08T00:00:00Z"
    },
    "end": {
      "$date": "2018-05-14T23:59:59.999Z"
    }
  },
  {
    "totalQty": 750,
    "count": 15,
    "start": {
      "$date": "2018-05-15T00:00:00Z"
    },
    "end": {
      "$date": "2018-05-21T23:59:59.999Z"
    }
  },
  {
    "totalQty": 493,
    "count": 11,
    "start": {
      "$date": "2018-05-22T00:00:00Z"
    },
    "end": {
      "$date": "2018-05-28T23:59:59.999Z"
    }
  },
  {
    "totalQty": 183,
    "count": 5,
    "start": {
      "$date": "2018-05-29T00:00:00Z"
    },
    "end": {
      "$date": "2018-05-31T23:59:59.999Z"
    }
  }
]
Cond Group
[
  {
    "totalQty": 489,
    "count": 11,
    "start": {
      "$date": "2018-05-01T00:00:00Z"
    },
    "end": {
      "$date": "2018-05-07T23:59:59.999Z"
    }
  },
  {
    "totalQty": 751,
    "count": 13,
    "start": {
      "$date": "2018-05-08T00:00:00Z"
    },
    "end": {
      "$date": "2018-05-14T23:59:59.999Z"
    }
  },
  {
    "totalQty": 750,
    "count": 15,
    "start": {
      "$date": "2018-05-15T00:00:00Z"
    },
    "end": {
      "$date": "2018-05-21T23:59:59.999Z"
    }
  },
  {
    "totalQty": 493,
    "count": 11,
    "start": {
      "$date": "2018-05-22T00:00:00Z"
    },
    "end": {
      "$date": "2018-05-28T23:59:59.999Z"
    }
  },
  {
    "totalQty": 183,
    "count": 5,
    "start": {
      "$date": "2018-05-29T00:00:00Z"
    },
    "end": {
      "$date": "2018-05-31T23:59:59.999Z"
    }
  }
]

Optional JavaScript demo

Since some of the approach above is rather "pythonic", then for a wider audience of JavaScript brains common to the subject matter would be like:

const { Schema } = mongoose = require('mongoose');
const moment = require('moment');

const uri = 'mongodb://localhost/test';

mongoose.Promise = global.Promise;
//mongoose.set('debug',true);

const saleSchema = new Schema({
  date: Date,
  qty: Number
})

const Sale = mongoose.model('Sale', saleSchema);

const log = data => console.log(JSON.stringify(data, undefined, 2));

(async function() {

  try {

    const conn = await mongoose.connect(uri);

    let start = new Date("2018-05-01");
    let end = new Date("2018-06-01");
    let date = new Date(start.valueOf());

    await Promise.all(Object.entries(conn.models).map(([k,m]) => m.remove()));

    let batch = [];

    while ( date.valueOf() < end.valueOf() ) {
      let hour = Math.floor(Math.random() * 24) + 1;
      date = new Date(date.valueOf() + (1000 * 60 * 60 * hour));
      if ( date > end )
        date = end;
      let qty = Math.floor(Math.random() * 100) + 1;
      if (date < end)
        batch.push({ date, qty });

      if (batch.length >= 1000) {
        await Sale.insertMany(batch);
        batch = [];
      }
    }

    if (batch.length > 0) {
      await Sale.insertMany(batch);
      batch = [];
    }

    let result = await Sale.aggregate([
      { "$match": { "date": { "$gte": start, "$lt": end } } },
      { "$group": {
        "_id": {
          "year": { "$year": "$date" },
          "week": { "$isoWeek": "$date" }
        },
        "totalQty": { "$sum": "$qty" },
        "count": { "$sum": 1 }
      }},
      { "$sort": { "_id": 1 } }
    ]);

    result = result.map(({ _id: { year, week }, ...r }) =>
      ({
        start: moment.utc([year]).isoWeek(week).startOf('isoWeek').toDate(),
        end: moment.utc([year]).isoWeek(week).endOf('isoWeek').toDate(),
        ...r
      })
    );

    log({ name: 'ISO group', result });

    let cuts = [start];
    date = start;

    while ( date.valueOf() < end.valueOf() ) {
      date = new Date(date.valueOf() + ( 1000 * 60 * 60 * 24 * 7 ));
      if ( date.valueOf() > end.valueOf() ) date = end;
      cuts.push(date);
    }

    let alternate = await Sale.aggregate([
      { "$match": { "date": { "$gte": start, "$lt": end } } },
      { "$bucket": {
        "groupBy": "$date",
        "boundaries": cuts,
        "output": {
          "totalQty": { "$sum": "$qty" },
          "count": { "$sum": 1 }
        }
      }},
      { "$addFields": {
        "_id": "$$REMOVE",
        "start": "$_id",
        "end": {
          "$cond": {
            "if": {
              "$gt": [
                { "$add": [ "$_id", ( 1000 * 60 * 60 * 24 * 7 ) - 1 ] },
                end
              ]
            },
            "then": { "$add": [ end, -1 ] },
            "else": {
              "$add": [ "$_id", ( 1000 * 60 * 60 * 24 * 7 ) - 1 ]
            }
          }
        }
      }}
    ]);
    log({ name: "Bucket group", result: alternate });


    cuts = [start];
    date = start;

    while ( date.valueOf() < end.valueOf() ) {
      date = new Date(date.valueOf() + ( 1000 * 60 * 60 * 24 * 7 ));
      if ( date.valueOf() > end.valueOf() ) date = end;
      if ( date.valueOf() < end.valueOf() )
        cuts.push(date);
    }

    let stack = [];

    for ( let i = cuts.length - 1; i > 0; i-- ) {
      let rec = {
        "$cond": [
          { "$lt": [ "$date", cuts[i] ] },
          cuts[i-1]
        ]
      };

      if ( stack.length === 0 ) {
        rec['$cond'].push(cuts[i])
      } else {
        let lval = stack.pop();
        rec['$cond'].push(lval);
      }

      stack.push(rec);
    }

    let pipeline = [
      { "$group": {
        "_id": stack[0],
        "totalQty": { "$sum": "$qty" },
        "count": { "$sum": 1 }
      }},
      { "$sort": { "_id": 1 } },
      { "$project": {
        "_id": 0,
        "start": "$_id",
        "end": {
          "$cond": {
            "if": {
              "$gt": [
                { "$add": [ "$_id", ( 1000 * 60 * 60 * 24 * 7 ) - 1 ] },
                end
              ]
            },
            "then": { "$add": [ end, -1 ] },
            "else": {
              "$add": [ "$_id", ( 1000 * 60 * 60 * 24 * 7 ) - 1 ]
            }
          }
        },
        "totalQty": 1,
        "count": 1
      }}
    ];

    let older = await Sale.aggregate(pipeline);
    log({ name: "Cond group", result: older });

    mongoose.disconnect();

  } catch(e) {
    console.error(e)
  } finally {
    process.exit()
  }

})()

And similar output of course:

{
  "name": "ISO group",
  "result": [
    {
      "start": "2018-04-30T00:00:00.000Z",
      "end": "2018-05-06T23:59:59.999Z",
      "totalQty": 576,
      "count": 10
    },
    {
      "start": "2018-05-07T00:00:00.000Z",
      "end": "2018-05-13T23:59:59.999Z",
      "totalQty": 707,
      "count": 11
    },
    {
      "start": "2018-05-14T00:00:00.000Z",
      "end": "2018-05-20T23:59:59.999Z",
      "totalQty": 656,
      "count": 12
    },
    {
      "start": "2018-05-21T00:00:00.000Z",
      "end": "2018-05-27T23:59:59.999Z",
      "totalQty": 829,
      "count": 16
    },
    {
      "start": "2018-05-28T00:00:00.000Z",
      "end": "2018-06-03T23:59:59.999Z",
      "totalQty": 239,
      "count": 6
    }
  ]
}
{
  "name": "Bucket group",
  "result": [
    {
      "totalQty": 666,
      "count": 11,
      "start": "2018-05-01T00:00:00.000Z",
      "end": "2018-05-07T23:59:59.999Z"
    },
    {
      "totalQty": 727,
      "count": 12,
      "start": "2018-05-08T00:00:00.000Z",
      "end": "2018-05-14T23:59:59.999Z"
    },
    {
      "totalQty": 647,
      "count": 12,
      "start": "2018-05-15T00:00:00.000Z",
      "end": "2018-05-21T23:59:59.999Z"
    },
    {
      "totalQty": 743,
      "count": 15,
      "start": "2018-05-22T00:00:00.000Z",
      "end": "2018-05-28T23:59:59.999Z"
    },
    {
      "totalQty": 224,
      "count": 5,
      "start": "2018-05-29T00:00:00.000Z",
      "end": "2018-05-31T23:59:59.999Z"
    }
  ]
}
{
  "name": "Cond group",
  "result": [
    {
      "totalQty": 666,
      "count": 11,
      "start": "2018-05-01T00:00:00.000Z",
      "end": "2018-05-07T23:59:59.999Z"
    },
    {
      "totalQty": 727,
      "count": 12,
      "start": "2018-05-08T00:00:00.000Z",
      "end": "2018-05-14T23:59:59.999Z"
    },
    {
      "totalQty": 647,
      "count": 12,
      "start": "2018-05-15T00:00:00.000Z",
      "end": "2018-05-21T23:59:59.999Z"
    },
    {
      "totalQty": 743,
      "count": 15,
      "start": "2018-05-22T00:00:00.000Z",
      "end": "2018-05-28T23:59:59.999Z"
    },
    {
      "totalQty": 224,
      "count": 5,
      "start": "2018-05-29T00:00:00.000Z",
      "end": "2018-05-31T23:59:59.999Z"
    }
  ]
}

Upvotes: 2

Related Questions