Reputation: 907
I am in a situation where I change one document a lot. I have several updates which I send in once by bulkWrite(). I would like to limit the number of notifications from a change stream based on bulkWrite(). If I do 5x updates on a document, then change stream will notify 5x. It would be nice to get only one notification after all those updates when bulkWrite() is done. Is there a way how to achieve that?
Upvotes: 1
Views: 1468
Reputation: 4239
I had a different situation, but similar to yours. I'm updating a document the moment a user changes a field in a form, but in the streams I would like to listen only to the final updated document.
The way I went about it was with a debounce.
The general idea is to store a unique representation of the stream (combination of collection, operation and document), and then have a timeout before you use that stream. If another stream comes in with the same unique id representation, you cancel the previous timeout and create a new one.
Here's a simple example for the update
operations:
const streamMap = new Map();
const debounceStreamExecution = (next, fn) => {
const streamId = `${next.operationType}_${next.documentKey._id.toString()}` // This could be your own unique representation of a stream.
const streamItem = streamMap.get(streamId);
if (streamItem && streamItem.timeout) {
// Same stream came before, so stop previous execution.
clearTimeout(streamItem.timeout);
}
// We merge previous updates with new ones
const updateDescription = {
updatedFields: {
// Use Lodash _.merge() if you want deep merging
...streamItem.updateDescription.updatedFields || {},
...next.updateDescription.updatedFields || {}
},
removedFields: [].concat(
streamItem.updateDescription.removedFields || [],
next.updateDescription.removedFields || []
)
};
// We set our timeout to delay execution
const timeout = setTimeout(() => {
streamMap.delete(streamId);
fn({
...next,
updateDescription
});
}, 1000 /* <-- Your delay preference */);
// We store this stream information.
streamMap.set(streamId, {
updateDescription,
timeout
});
};
db.collection('aCollection').watch().on('change', next => {
debounceStreamExecution(next, (result) => {
console.log('Single stream', result);
});
});
Upvotes: 1
Reputation: 1342
Please check MongoDB documentation https://docs.mongodb.com/manual/core/bulk-write-operations/#bulkwrite-methods
if you try to do something like this:
db.collectionName.bulkWrite(
[
{ insertOne :
{
"document" :
{
"char" : "Dithras", "class" : "barbarian", "lvl" : 4
}
}
},
{ insertOne :
{
"document" :
{
"char" : "Taeln", "class" : "fighter", "lvl" : 3
}
}
},
{ updateOne :
{
"filter" : { "char" : "Taeln" },
"update" : { $set : { "status" : "Critical Injury" } }
}
},
{ deleteOne :
{ "filter" : { "char" : "Brisbane"} }
},
{ replaceOne :
{
"filter" : { "char" : "Meldane" },
"replacement" : { "char" : "Tanys", "class" : "oracle", "lvl" : 4 }
}
}
]
);
you will get only one result
{
"acknowledged" : true,
"deletedCount" : 0,
"insertedCount" : 2,
"matchedCount" : 1,
"upsertedCount" : 0,
"insertedIds" : {
"0" : ObjectId("5bdfe0a5bad3d851a4064080"),
"1" : ObjectId("5bdfe0a5bad3d851a4064081")
},
"upsertedIds" : {
}
}
If I can do anything else feel free to ask:).
Upvotes: 1