EdWood
EdWood

Reputation: 907

MongoDB Change Streams on bulkWrite()

I am in a situation where I change one document a lot. I have several updates which I send in once by bulkWrite(). I would like to limit the number of notifications from a change stream based on bulkWrite(). If I do 5x updates on a document, then change stream will notify 5x. It would be nice to get only one notification after all those updates when bulkWrite() is done. Is there a way how to achieve that?

Upvotes: 1

Views: 1468

Answers (2)

m.spyratos
m.spyratos

Reputation: 4239

I had a different situation, but similar to yours. I'm updating a document the moment a user changes a field in a form, but in the streams I would like to listen only to the final updated document.

The way I went about it was with a debounce.

The general idea is to store a unique representation of the stream (combination of collection, operation and document), and then have a timeout before you use that stream. If another stream comes in with the same unique id representation, you cancel the previous timeout and create a new one.

Here's a simple example for the update operations:

const streamMap = new Map();

const debounceStreamExecution = (next, fn) => {

    const streamId = `${next.operationType}_${next.documentKey._id.toString()}` // This could be your own unique representation of a stream.
    const streamItem = streamMap.get(streamId);

    if (streamItem && streamItem.timeout) {
        // Same stream came before, so stop previous execution.
        clearTimeout(streamItem.timeout);
    }

    // We merge previous updates with new ones
    const updateDescription = {
        updatedFields: {
            // Use Lodash _.merge() if you want deep merging
            ...streamItem.updateDescription.updatedFields || {},
            ...next.updateDescription.updatedFields || {}
        },
        removedFields: [].concat(
            streamItem.updateDescription.removedFields || [],
            next.updateDescription.removedFields || []
        )
    };

    // We set our timeout to delay execution
    const timeout = setTimeout(() => {
        streamMap.delete(streamId);
        fn({
            ...next,
            updateDescription
        });
    }, 1000 /* <-- Your delay preference */);

    // We store this stream information.
    streamMap.set(streamId, {
        updateDescription,
        timeout
    });

};

db.collection('aCollection').watch().on('change', next => {
    debounceStreamExecution(next, (result) => {
        console.log('Single stream', result);
    });
});

Upvotes: 1

andranikasl
andranikasl

Reputation: 1342

Please check MongoDB documentation https://docs.mongodb.com/manual/core/bulk-write-operations/#bulkwrite-methods if you try to do something like this:

db.collectionName.bulkWrite(
      [
         { insertOne :
            {
               "document" :
               {
                  "char" : "Dithras", "class" : "barbarian", "lvl" : 4
               }
            }
         },
         { insertOne :
            {
               "document" :
               {
                  "char" : "Taeln", "class" : "fighter", "lvl" : 3
               }
            }
         },
         { updateOne :
            {
               "filter" : { "char" : "Taeln" },
               "update" : { $set : { "status" : "Critical Injury" } }
            }
         },
         { deleteOne :
            { "filter" : { "char" : "Brisbane"} }
         },
         { replaceOne :
            {
               "filter" : { "char" : "Meldane" },
               "replacement" : { "char" : "Tanys", "class" : "oracle", "lvl" : 4 }
            }
         }
      ]
   );

you will get only one result

{
    "acknowledged" : true,
    "deletedCount" : 0,
    "insertedCount" : 2,
    "matchedCount" : 1,
    "upsertedCount" : 0,
    "insertedIds" : {
        "0" : ObjectId("5bdfe0a5bad3d851a4064080"),
        "1" : ObjectId("5bdfe0a5bad3d851a4064081")
    },
    "upsertedIds" : {

    }
}

If I can do anything else feel free to ask:).

Upvotes: 1

Related Questions