prototype
prototype

Reputation: 7990

Keep N most Recent Records by Group While Removing Others

Is there a way in MongoDB to identify (so as to delete) all but the most recent N records per group?

We have a MongoDB collection in which we keep collections of "docs" and "logs". Each log keeps a reference to a docId and a date:

The log DB is getting huge. I'd like to drop all logs except those that are either:

I know how to drop all logs older than 30 days. But I don't know how to keep the most recent N logs for a document.

Upvotes: 2

Views: 1152

Answers (1)

Neil Lunn
Neil Lunn

Reputation: 151132

In a single statement it cannot be done. What you can do is to essentially identify the "last 12" for each possible "docId" value and then exclude those documents from being removed by adding the list to $nin when you issue the request to remove the documents.

You don't specify a preferred programming environment, but here is a general process using nodejs:

const MongoClient = require('mongodb').MongoClient;

const uri = 'mongodb://localhost/test';

(async function() {

  let db;

  // Calculate date cutoff
  let oneDay = 1000 * 60 * 60 * 24,
      thirtyDays = oneDay * 30,
      now = Date.now(),
      cutoff = new Date(
        ( now - ( now % oneDay ) ) - thirtyDays
      );

  try {

    db = await MongoClient.connect(uri);

    let log = db.collection('log');
    let doc = db.collection('doc');

    await new Promise((resolve,reject) => {

      let ops = [];

      let stream = doc.find();

      stream.on('error', reject);
      stream.on('end', async () => {
        if ( ops.length > 0 ) {
          await log.bulkWrite(ops);
          ops = [];
        }
        resolve();
      });

      stream.on('data', async (data) => {

        // Pause processing input stream
        stream.pause();

        // get last 12 for doc
        let last = await (log.find({ docId: data._id })
          .project({ _id: 1 })
          .sort({ date: -1 }).limit(12)).map(d => d._id);

        ops.push({
          deleteMany: {
            filter: {
              _id: { $nin: last },
              docId: data._id,
              date: { $lt: cutoff }
            }
          }
        });

        if ( ops.length >= 1000 ) {
          await log.bulkWrite(ops);
          ops = [];
        }

        // Resume processing input stream
        stream.resume()

      });

    });

  } catch(e) {
    console.error(e);
  } finally {
    db.close();
  }

})();

The basic premise here is that you loop through the documents in the "doc" collection and then perform a query against the "log" collection in order to return the 12 most recent documents. We then make a list of the _id values from each of those documents found, if any.

The point of doing this is that the next thing to do is issue a deleteMany operation on the database. Because there are going to be a lot of these we are going to use .bulkWrite() instead of issuing the request every time we iterate a source document. This cuts down the network traffic and delay considerably.

The basic statement then is to remove all documents where the "docId" matches the current document from the source in the cursor, and where the date is older than the cutoff point of 30 days.

The additional criteria uses $nin to "exclude" any documents that where identified as being in the "most recent 12" from the previous query. This makes sure that those documents are always retained since they are excluded from removal.

        ops.push({
          deleteMany: {
            filter: {
              _id: { $nin: last },
              docId: data._id,
              date: { $lt: cutoff }
            }
          }
        });

And that is all there really is to it. The rest of the processing is about accumulating the "batch" until there are 1000 entries ( a reasonable size, but anything under the 16MB BSON limit for a request is possible ) when the actual request is sent to the server to process and actually remove the documents.

When the cursor is exhausted the process is complete, and any remaining "batched" instructions are committed.


MongoDB 3.6 Preview

One thing you can get out of the currently "upcoming" release of MongoDB is that it allows a "non-correlated" form of $lookup which means we can essentially get the "top 12" for each target document in a single request instead of issuing multiple queries.

It does this because this form of $lookup takes a "pipeline" as an argument instead of fixed output based on matching of local and foreign keys. This allows us to $match, $sort and $limit the results returned.

const MongoClient = require('mongodb').MongoClient;

const uri = 'mongodb://localhost/test';

(async function() {

  let db;

  // Calculate date cutoff
  let oneDay = 1000 * 60 * 60 * 24,
      thirtyDays = oneDay * 30,
      now = Date.now(),
      cutoff = new Date(
        ( now - ( now % oneDay ) ) - thirtyDays
      );

  try {

    db = await MongoClient.connect(uri);

    await new Promise((resolve,reject) => {

      let ops = [];

      let stream = db.collection('doc').aggregate([
        { "$lookup": {
          "from": "log",
          "let": {
            "id": "$_id"
          },
          "pipeline": [
            { "$match": {
              "docId": { "$eq": { "$expr": "$$id" } }
            }},
            { "$sort": { "date": -1 } },
            { "$limit": 12 },
            { "$project": { "_id": 1 } }
          ],
          "as": 'docs'
        }},
      ]);

      stream.on('error', reject);
      stream.on('end', async () => {
        if ( ops.length > 0 ) {
          await db.collection('log').bulkWrite(ops);
          ops = [];
        }
        resolve();
      });

      stream.on('data', async (data) => {
        stream.pause();

        ops.push({
          deleteMany: {
            filter: {
              _id: { $nin: data.docs.map(d => d._id) },
              docId: data._id,
              date: { $lt: cutoff }
            }
          }
        });

        if ( ops.length >= 1000 ) {
          await db.collection('log').bulkWrite(ops);
          ops = [];
        }
        stream.resume();

      });

    });


  } catch(e) {
    console.error(e);
  } finally {
    db.close();
  }

})();

The key to this is $expr, which was only finalized in the 3.5.12 development release. This allows an efficient $match expression and then makes this a viable alternative to processing separate queries.

Of course, you really want to wait for that to be ready for production. But it's good to be aware of it so you could transition to such a process as you upgrade your underlying MongoDB eventually.

Upvotes: 3

Related Questions