Reputation: 7990
Is there a way in MongoDB to identify (so as to delete) all but the most recent N records per group?
We have a MongoDB collection in which we keep collections of "docs" and "logs". Each log keeps a reference to a docId and a date:
({_id: ObjectId})
({ docId: String, date: Date})
The log DB is getting huge. I'd like to drop all logs except those that are either:
I know how to drop all logs older than 30 days. But I don't know how to keep the most recent N logs for a document.
Upvotes: 2
Views: 1152
Reputation: 151132
In a single statement it cannot be done. What you can do is to essentially identify the "last 12" for each possible "docId"
value and then exclude those documents from being removed by adding the list to $nin
when you issue the request to remove the documents.
You don't specify a preferred programming environment, but here is a general process using nodejs:
const MongoClient = require('mongodb').MongoClient;
const uri = 'mongodb://localhost/test';
(async function() {
let db;
// Calculate date cutoff
let oneDay = 1000 * 60 * 60 * 24,
thirtyDays = oneDay * 30,
now = Date.now(),
cutoff = new Date(
( now - ( now % oneDay ) ) - thirtyDays
);
try {
db = await MongoClient.connect(uri);
let log = db.collection('log');
let doc = db.collection('doc');
await new Promise((resolve,reject) => {
let ops = [];
let stream = doc.find();
stream.on('error', reject);
stream.on('end', async () => {
if ( ops.length > 0 ) {
await log.bulkWrite(ops);
ops = [];
}
resolve();
});
stream.on('data', async (data) => {
// Pause processing input stream
stream.pause();
// get last 12 for doc
let last = await (log.find({ docId: data._id })
.project({ _id: 1 })
.sort({ date: -1 }).limit(12)).map(d => d._id);
ops.push({
deleteMany: {
filter: {
_id: { $nin: last },
docId: data._id,
date: { $lt: cutoff }
}
}
});
if ( ops.length >= 1000 ) {
await log.bulkWrite(ops);
ops = [];
}
// Resume processing input stream
stream.resume()
});
});
} catch(e) {
console.error(e);
} finally {
db.close();
}
})();
The basic premise here is that you loop through the documents in the "doc" collection and then perform a query against the "log" collection in order to return the 12 most recent documents. We then make a list of the _id
values from each of those documents found, if any.
The point of doing this is that the next thing to do is issue a deleteMany
operation on the database. Because there are going to be a lot of these we are going to use .bulkWrite()
instead of issuing the request every time we iterate a source document. This cuts down the network traffic and delay considerably.
The basic statement then is to remove all documents where the "docId"
matches the current document from the source in the cursor, and where the date is older than the cutoff point of 30 days.
The additional criteria uses $nin
to "exclude" any documents that where identified as being in the "most recent 12" from the previous query. This makes sure that those documents are always retained since they are excluded from removal.
ops.push({
deleteMany: {
filter: {
_id: { $nin: last },
docId: data._id,
date: { $lt: cutoff }
}
}
});
And that is all there really is to it. The rest of the processing is about accumulating the "batch" until there are 1000 entries ( a reasonable size, but anything under the 16MB BSON limit for a request is possible ) when the actual request is sent to the server to process and actually remove the documents.
When the cursor is exhausted the process is complete, and any remaining "batched" instructions are committed.
One thing you can get out of the currently "upcoming" release of MongoDB is that it allows a "non-correlated" form of $lookup
which means we can essentially get the "top 12" for each target document in a single request instead of issuing multiple queries.
It does this because this form of $lookup
takes a "pipeline" as an argument instead of fixed output based on matching of local and foreign keys. This allows us to $match
, $sort
and $limit
the results returned.
const MongoClient = require('mongodb').MongoClient;
const uri = 'mongodb://localhost/test';
(async function() {
let db;
// Calculate date cutoff
let oneDay = 1000 * 60 * 60 * 24,
thirtyDays = oneDay * 30,
now = Date.now(),
cutoff = new Date(
( now - ( now % oneDay ) ) - thirtyDays
);
try {
db = await MongoClient.connect(uri);
await new Promise((resolve,reject) => {
let ops = [];
let stream = db.collection('doc').aggregate([
{ "$lookup": {
"from": "log",
"let": {
"id": "$_id"
},
"pipeline": [
{ "$match": {
"docId": { "$eq": { "$expr": "$$id" } }
}},
{ "$sort": { "date": -1 } },
{ "$limit": 12 },
{ "$project": { "_id": 1 } }
],
"as": 'docs'
}},
]);
stream.on('error', reject);
stream.on('end', async () => {
if ( ops.length > 0 ) {
await db.collection('log').bulkWrite(ops);
ops = [];
}
resolve();
});
stream.on('data', async (data) => {
stream.pause();
ops.push({
deleteMany: {
filter: {
_id: { $nin: data.docs.map(d => d._id) },
docId: data._id,
date: { $lt: cutoff }
}
}
});
if ( ops.length >= 1000 ) {
await db.collection('log').bulkWrite(ops);
ops = [];
}
stream.resume();
});
});
} catch(e) {
console.error(e);
} finally {
db.close();
}
})();
The key to this is $expr
, which was only finalized in the 3.5.12 development release. This allows an efficient $match
expression and then makes this a viable alternative to processing separate queries.
Of course, you really want to wait for that to be ready for production. But it's good to be aware of it so you could transition to such a process as you upgrade your underlying MongoDB eventually.
Upvotes: 3