Reputation: 95
Currently, I am dealing with a large set of data that is stored in MongoDB(2M single collections from a larger collection of 20M). Feilds: id, item name, item type, item description and date()
Dynamically count how many items occur in a date span of a week and month for the entire collection. i.e From 2014-01-01 to 2014-01-07 has 20 items, From 2014-01-08 to 2014-01-16 has 50 items,.. etc
Using python, how can I accomplish this? Are their libraries for this or it would be a custom code?
Alternatively, should this be done all through MongoDB?
Upvotes: 1
Views: 688
Reputation: 151132
The general way is of course to have the database handle aggregation. If you want data from within a "week range" then there are a couple of ways to go about it, just depending on which approach you actually want for your case.
Just demonstrating for the "month of May" by example, then you would have something like:
startdate = datetime(2018,5,1)
enddate = datetime(2018,6,1)
result = db.sales.aggregate([
{ '$match': { 'date': { '$gte': startdate, '$lt': enddate } } },
{ '$group': {
'_id': {
'year': { '$year': '$date' },
'week': { '$isoWeek': '$date' }
},
'totalQty': { '$sum': '$qty' },
'count': { '$sum': 1 }
}},
{ '$sort': { '_id': 1 } }
])
It's a fairly simple invocation using the $year
and $isoWeek
or possibly even the $week
operators depending on what your MongoDB version actually supports. All you need do is specify these within the _id
grouping key of $group
and then choose other accumulators like $sum
depending on what you actually need to "accumulate" within that grouping.
The $week
and $isoWeek
just come out slightly different where the latter is more aligned with functions of the isoweek
library for Python and similar things for other languages. In general you can probably just adjust the week with the two by adding 1
. See the documentation for more detail.
In this case you can optionally just let the database do the "aggregation" work and then get your "desired dates" based on the output. i.e For python you can transform the result with datetime
values corresponding to each week as:
result = list(result)
for item in result:
item.update({
'start': datetime.combine(
Week(item['_id']['year'],item['_id']['week']).monday(),
datetime.min.time()
),
'end': datetime.combine(
Week(item['_id']['year'],item['_id']['week']).sunday(),
datetime.max.time()
)
})
item.pop('_id',None)
If sticking to the ISO standards is not for you, then the alternate approach is to define your own "intervals" on which to accumulate the "grouping" for. The main tool with MongoDB here is $bucket
, and a little list processing beforehand:
cuts = [startdate]
date = startdate
while ( date < enddate ):
date = date + timedelta(days=7)
if ( date > enddate ):
date = enddate
cuts.append(date)
alternate = db.sales.aggregate([
{ '$match': { 'date': { '$gte': startdate, '$lt': enddate } } },
{ '$bucket': {
'groupBy': '$date',
'boundaries': cuts,
'output': {
'totalQty': { '$sum': '$qty' },
'count': { '$sum': 1 }
}
}},
{ '$project': {
'_id': 0,
'start': '$_id',
'end': {
'$cond': {
'if': {
'$gt': [
{ '$add': ['$_id', (1000 * 60 * 60 * 24 * 7) - 1] },
enddate
]
},
'then': { '$add': [ enddate, -1 ] },
'else': {
'$add': ['$_id', (1000 * 60 * 60 * 24 * 7) - 1]
}
}
},
'totalQty': 1,
'count': 1
}}
])
Instead of using defined functions such as $week
or $isoWeek
, instead we work out the "intervals of 7 days" from a given query start date and produce an array of those intervals, always of course ending with the "maximum" value from the range of data being selected.
This list
is then given in argument to the $bucket
aggregation stage for it's "boundaries"
option. This is actually just a list of values which tells the statement what to accumulate "up to" for each "grouping" produced.
The actual statement is really just a "shorthand" implementation of the $switch
aggregation operator within a $group
pipeline stage. Both of those operators require MongoDB 3.4, but you can actually do the same thing using $cond
within a $group
but just nesting each else
condition for each "boundary" value. It's possible, but just a bit more involved and you really should be using MongoDB 3.4 as a minimum version by now anyway.
If you find you really must though, using $cond
within a $group
is added to the examples below just showing how to essentially transform that same cuts
list into such a statement and that means you can essentially do the same thing all the way back to MongoDB 2.2 where the aggregation framework was introduced.
As a full example you can consider the following listing which inserts a month worth of random data and then runs both presented aggregation options over it:
from random import randint
from datetime import datetime, timedelta, date
from isoweek import Week
from pymongo import MongoClient
from bson.json_util import dumps, JSONOptions
import bson.json_util
client = MongoClient()
db = client.test
db.sales.delete_many({})
startdate = datetime(2018,5,1)
enddate = datetime(2018,6,1)
currdate = startdate
batch = []
while ( currdate < enddate ):
currdate = currdate + timedelta(hours=randint(1,24))
if ( currdate > enddate ):
currdate = enddate
qty = randint(1,100);
if ( currdate < enddate ):
batch.append({ 'date': currdate, 'qty': qty })
if ( len(batch) >= 1000 ):
db.sales.insert_many(batch)
batch = []
if ( len(batch) > 0):
db.sales.insert_many(batch)
batch = []
result = db.sales.aggregate([
{ '$match': { 'date': { '$gte': startdate, '$lt': enddate } } },
{ '$group': {
'_id': {
'year': { '$year': '$date' },
'week': { '$isoWeek': '$date' }
},
'totalQty': { '$sum': '$qty' },
'count': { '$sum': 1 }
}},
{ '$sort': { '_id': 1 } }
])
result = list(result)
for item in result:
item.update({
'start': datetime.combine(
Week(item['_id']['year'],item['_id']['week']).monday(),
datetime.min.time()
),
'end': datetime.combine(
Week(item['_id']['year'],item['_id']['week']).sunday(),
datetime.max.time()
)
})
item.pop('_id',None)
print("Week grouping")
print(
dumps(result,indent=2,
json_options=JSONOptions(datetime_representation=2)))
cuts = [startdate]
date = startdate
while ( date < enddate ):
date = date + timedelta(days=7)
if ( date > enddate ):
date = enddate
cuts.append(date)
alternate = db.sales.aggregate([
{ '$match': { 'date': { '$gte': startdate, '$lt': enddate } } },
{ '$bucket': {
'groupBy': '$date',
'boundaries': cuts,
'output': {
'totalQty': { '$sum': '$qty' },
'count': { '$sum': 1 }
}
}},
{ '$project': {
'_id': 0,
'start': '$_id',
'end': {
'$cond': {
'if': {
'$gt': [
{ '$add': ['$_id', (1000 * 60 * 60 * 24 * 7) - 1] },
enddate
]
},
'then': { '$add': [ enddate, -1 ] },
'else': {
'$add': ['$_id', (1000 * 60 * 60 * 24 * 7) - 1]
}
}
},
'totalQty': 1,
'count': 1
}}
])
alternate = list(alternate)
print("Bucket grouping")
print(
dumps(alternate,indent=2,
json_options=JSONOptions(datetime_representation=2)))
cuts = [startdate]
date = startdate
while ( date < enddate ):
date = date + timedelta(days=7)
if ( date > enddate ):
date = enddate
if ( date < enddate ):
cuts.append(date)
stack = []
for i in range(len(cuts)-1,0,-1):
rec = {
'$cond': [
{ '$lt': [ '$date', cuts[i] ] },
cuts[i-1]
]
}
if ( len(stack) == 0 ):
rec['$cond'].append(cuts[i])
else:
lval = stack.pop()
rec['$cond'].append(lval)
stack.append(rec)
pipeline = [
{ '$match': { 'date': { '$gt': startdate, '$lt': enddate } } },
{ '$group': {
'_id': stack[0],
'totalQty': { '$sum': '$qty' },
'count': { '$sum': 1 }
}},
{ '$sort': { '_id': 1 } },
{ '$project': {
'_id': 0,
'start': '$_id',
'end': {
'$cond': {
'if': {
'$gt': [
{ '$add': [ '$_id', ( 1000 * 60 * 60 * 24 * 7 ) - 1 ] },
enddate
]
},
'then': { '$add': [ enddate, -1 ] },
'else': {
'$add': [ '$_id', ( 1000 * 60 * 60 * 24 * 7 ) - 1 ]
}
}
},
'totalQty': 1,
'count': 1
}}
]
#print(
# dumps(pipeline,indent=2,
# json_options=JSONOptions(datetime_representation=2)))
older = db.sales.aggregate(pipeline)
older = list(older)
print("Cond Group")
print(
dumps(older,indent=2,
json_options=JSONOptions(datetime_representation=2)))
With output:
Week grouping
[
{
"totalQty": 449,
"count": 9,
"start": {
"$date": "2018-04-30T00:00:00Z"
},
"end": {
"$date": "2018-05-06T23:59:59.999Z"
}
},
{
"totalQty": 734,
"count": 14,
"start": {
"$date": "2018-05-07T00:00:00Z"
},
"end": {
"$date": "2018-05-13T23:59:59.999Z"
}
},
{
"totalQty": 686,
"count": 14,
"start": {
"$date": "2018-05-14T00:00:00Z"
},
"end": {
"$date": "2018-05-20T23:59:59.999Z"
}
},
{
"totalQty": 592,
"count": 12,
"start": {
"$date": "2018-05-21T00:00:00Z"
},
"end": {
"$date": "2018-05-27T23:59:59.999Z"
}
},
{
"totalQty": 205,
"count": 6,
"start": {
"$date": "2018-05-28T00:00:00Z"
},
"end": {
"$date": "2018-06-03T23:59:59.999Z"
}
}
]
Bucket grouping
[
{
"totalQty": 489,
"count": 11,
"start": {
"$date": "2018-05-01T00:00:00Z"
},
"end": {
"$date": "2018-05-07T23:59:59.999Z"
}
},
{
"totalQty": 751,
"count": 13,
"start": {
"$date": "2018-05-08T00:00:00Z"
},
"end": {
"$date": "2018-05-14T23:59:59.999Z"
}
},
{
"totalQty": 750,
"count": 15,
"start": {
"$date": "2018-05-15T00:00:00Z"
},
"end": {
"$date": "2018-05-21T23:59:59.999Z"
}
},
{
"totalQty": 493,
"count": 11,
"start": {
"$date": "2018-05-22T00:00:00Z"
},
"end": {
"$date": "2018-05-28T23:59:59.999Z"
}
},
{
"totalQty": 183,
"count": 5,
"start": {
"$date": "2018-05-29T00:00:00Z"
},
"end": {
"$date": "2018-05-31T23:59:59.999Z"
}
}
]
Cond Group
[
{
"totalQty": 489,
"count": 11,
"start": {
"$date": "2018-05-01T00:00:00Z"
},
"end": {
"$date": "2018-05-07T23:59:59.999Z"
}
},
{
"totalQty": 751,
"count": 13,
"start": {
"$date": "2018-05-08T00:00:00Z"
},
"end": {
"$date": "2018-05-14T23:59:59.999Z"
}
},
{
"totalQty": 750,
"count": 15,
"start": {
"$date": "2018-05-15T00:00:00Z"
},
"end": {
"$date": "2018-05-21T23:59:59.999Z"
}
},
{
"totalQty": 493,
"count": 11,
"start": {
"$date": "2018-05-22T00:00:00Z"
},
"end": {
"$date": "2018-05-28T23:59:59.999Z"
}
},
{
"totalQty": 183,
"count": 5,
"start": {
"$date": "2018-05-29T00:00:00Z"
},
"end": {
"$date": "2018-05-31T23:59:59.999Z"
}
}
]
Since some of the approach above is rather "pythonic", then for a wider audience of JavaScript brains common to the subject matter would be like:
const { Schema } = mongoose = require('mongoose');
const moment = require('moment');
const uri = 'mongodb://localhost/test';
mongoose.Promise = global.Promise;
//mongoose.set('debug',true);
const saleSchema = new Schema({
date: Date,
qty: Number
})
const Sale = mongoose.model('Sale', saleSchema);
const log = data => console.log(JSON.stringify(data, undefined, 2));
(async function() {
try {
const conn = await mongoose.connect(uri);
let start = new Date("2018-05-01");
let end = new Date("2018-06-01");
let date = new Date(start.valueOf());
await Promise.all(Object.entries(conn.models).map(([k,m]) => m.remove()));
let batch = [];
while ( date.valueOf() < end.valueOf() ) {
let hour = Math.floor(Math.random() * 24) + 1;
date = new Date(date.valueOf() + (1000 * 60 * 60 * hour));
if ( date > end )
date = end;
let qty = Math.floor(Math.random() * 100) + 1;
if (date < end)
batch.push({ date, qty });
if (batch.length >= 1000) {
await Sale.insertMany(batch);
batch = [];
}
}
if (batch.length > 0) {
await Sale.insertMany(batch);
batch = [];
}
let result = await Sale.aggregate([
{ "$match": { "date": { "$gte": start, "$lt": end } } },
{ "$group": {
"_id": {
"year": { "$year": "$date" },
"week": { "$isoWeek": "$date" }
},
"totalQty": { "$sum": "$qty" },
"count": { "$sum": 1 }
}},
{ "$sort": { "_id": 1 } }
]);
result = result.map(({ _id: { year, week }, ...r }) =>
({
start: moment.utc([year]).isoWeek(week).startOf('isoWeek').toDate(),
end: moment.utc([year]).isoWeek(week).endOf('isoWeek').toDate(),
...r
})
);
log({ name: 'ISO group', result });
let cuts = [start];
date = start;
while ( date.valueOf() < end.valueOf() ) {
date = new Date(date.valueOf() + ( 1000 * 60 * 60 * 24 * 7 ));
if ( date.valueOf() > end.valueOf() ) date = end;
cuts.push(date);
}
let alternate = await Sale.aggregate([
{ "$match": { "date": { "$gte": start, "$lt": end } } },
{ "$bucket": {
"groupBy": "$date",
"boundaries": cuts,
"output": {
"totalQty": { "$sum": "$qty" },
"count": { "$sum": 1 }
}
}},
{ "$addFields": {
"_id": "$$REMOVE",
"start": "$_id",
"end": {
"$cond": {
"if": {
"$gt": [
{ "$add": [ "$_id", ( 1000 * 60 * 60 * 24 * 7 ) - 1 ] },
end
]
},
"then": { "$add": [ end, -1 ] },
"else": {
"$add": [ "$_id", ( 1000 * 60 * 60 * 24 * 7 ) - 1 ]
}
}
}
}}
]);
log({ name: "Bucket group", result: alternate });
cuts = [start];
date = start;
while ( date.valueOf() < end.valueOf() ) {
date = new Date(date.valueOf() + ( 1000 * 60 * 60 * 24 * 7 ));
if ( date.valueOf() > end.valueOf() ) date = end;
if ( date.valueOf() < end.valueOf() )
cuts.push(date);
}
let stack = [];
for ( let i = cuts.length - 1; i > 0; i-- ) {
let rec = {
"$cond": [
{ "$lt": [ "$date", cuts[i] ] },
cuts[i-1]
]
};
if ( stack.length === 0 ) {
rec['$cond'].push(cuts[i])
} else {
let lval = stack.pop();
rec['$cond'].push(lval);
}
stack.push(rec);
}
let pipeline = [
{ "$group": {
"_id": stack[0],
"totalQty": { "$sum": "$qty" },
"count": { "$sum": 1 }
}},
{ "$sort": { "_id": 1 } },
{ "$project": {
"_id": 0,
"start": "$_id",
"end": {
"$cond": {
"if": {
"$gt": [
{ "$add": [ "$_id", ( 1000 * 60 * 60 * 24 * 7 ) - 1 ] },
end
]
},
"then": { "$add": [ end, -1 ] },
"else": {
"$add": [ "$_id", ( 1000 * 60 * 60 * 24 * 7 ) - 1 ]
}
}
},
"totalQty": 1,
"count": 1
}}
];
let older = await Sale.aggregate(pipeline);
log({ name: "Cond group", result: older });
mongoose.disconnect();
} catch(e) {
console.error(e)
} finally {
process.exit()
}
})()
And similar output of course:
{
"name": "ISO group",
"result": [
{
"start": "2018-04-30T00:00:00.000Z",
"end": "2018-05-06T23:59:59.999Z",
"totalQty": 576,
"count": 10
},
{
"start": "2018-05-07T00:00:00.000Z",
"end": "2018-05-13T23:59:59.999Z",
"totalQty": 707,
"count": 11
},
{
"start": "2018-05-14T00:00:00.000Z",
"end": "2018-05-20T23:59:59.999Z",
"totalQty": 656,
"count": 12
},
{
"start": "2018-05-21T00:00:00.000Z",
"end": "2018-05-27T23:59:59.999Z",
"totalQty": 829,
"count": 16
},
{
"start": "2018-05-28T00:00:00.000Z",
"end": "2018-06-03T23:59:59.999Z",
"totalQty": 239,
"count": 6
}
]
}
{
"name": "Bucket group",
"result": [
{
"totalQty": 666,
"count": 11,
"start": "2018-05-01T00:00:00.000Z",
"end": "2018-05-07T23:59:59.999Z"
},
{
"totalQty": 727,
"count": 12,
"start": "2018-05-08T00:00:00.000Z",
"end": "2018-05-14T23:59:59.999Z"
},
{
"totalQty": 647,
"count": 12,
"start": "2018-05-15T00:00:00.000Z",
"end": "2018-05-21T23:59:59.999Z"
},
{
"totalQty": 743,
"count": 15,
"start": "2018-05-22T00:00:00.000Z",
"end": "2018-05-28T23:59:59.999Z"
},
{
"totalQty": 224,
"count": 5,
"start": "2018-05-29T00:00:00.000Z",
"end": "2018-05-31T23:59:59.999Z"
}
]
}
{
"name": "Cond group",
"result": [
{
"totalQty": 666,
"count": 11,
"start": "2018-05-01T00:00:00.000Z",
"end": "2018-05-07T23:59:59.999Z"
},
{
"totalQty": 727,
"count": 12,
"start": "2018-05-08T00:00:00.000Z",
"end": "2018-05-14T23:59:59.999Z"
},
{
"totalQty": 647,
"count": 12,
"start": "2018-05-15T00:00:00.000Z",
"end": "2018-05-21T23:59:59.999Z"
},
{
"totalQty": 743,
"count": 15,
"start": "2018-05-22T00:00:00.000Z",
"end": "2018-05-28T23:59:59.999Z"
},
{
"totalQty": 224,
"count": 5,
"start": "2018-05-29T00:00:00.000Z",
"end": "2018-05-31T23:59:59.999Z"
}
]
}
Upvotes: 2