Reputation: 119
I'd like to use MongoDB streams to trigger an event each time a specific kind of data is inserted into a collection.
I've already found something roughly similar to what I'm looking for but that's only working for change streams and not insertions.
Any idea of how I can get that done?
I'm using Mongodb driver with Nodejs to get that done so my code would be something like that:
const MongoClient = require('mongodb').MongoClient;
const uri = 'mongodb://localhost:27017/?replicaSet=rs0';
MongoClient.connect(uri, function(err, client) {
const db = client.db('mydb');
// Connect using MongoClient
var filter = [{
$match: {
$or: [
{ $or: [{"receipt.receiver": "newdexpocket"}, {"act.account": "newdexpocket"}] }]
}
}];
var options = { fullDocument: 'updateLookup' };
db.collection('somecollection').watch(filter, options).on('create', data =>
{
console.log(data);
});
});
operationType
in the filter?updateLookup
isn't the right tool, what should I use?on
event? I've used create
but I'm not even sure it exists, does it?Sorry about all these questions but I'm struggling to find some answers in the official doc.
SOLUTION:
Beware not to forget fullDocument
in your request ;-)
function watch_insert(con, db, coll) {
console.log(new Date() + ' watching: ' + coll);
const insert_pipeline = [ { $match:
{
operationType: 'insert',
$or: [
{ "fullDocument.receipt.receiver": "newdexpocket" },
{ "fullDocument.act.account": "newdexpocket" }
]
}
}];
con.db(db).collection(coll).watch(insert_pipeline)
.on('change', data => {
console.log(data)
});
}
async function run(uri) {
try {
con = await MongoClient.connect(uri, {"useNewUrlParser": true});
watch_insert(con, 'EOS', 'action_traces');
} catch (err) {
console.log(err);
}
}
Upvotes: 2
Views: 3745
Reputation: 919
You can adapt it to your variant by looking for a solution to some points
Stop mongo service: brew services stop mongodb-community
Edit /opt/homebrew/etc/mongod.conf
Add this line:
replication:
replSetName: "rs0"
Save file and open mongo shell: mongo
Run this command: rs.initiate()
Exit from mongo shell: exit
start mongo services: brew services start mongodb-community
In your mongo client configuration, add this option: replicaSet=rs0
And "The $changeStream stage is only supported on replica sets because I'm not using a replicaSet
" error will go away!
Upvotes: 0
Reputation: 13775
You would need to:
operationType: 'insert'
. Since you don't want to monitor updates, you don't need updateLookup
.operationType
.watch()
. An example output is in the Change Events page.watch()
returns a ChangeStream
. It fires close
, change
, end
, and error
events. See ChangeStream for more details.
Here's a full example of a changestream that listens on insert
operation on the database test
collection test
. It will output documents that has the field {a: 1}
('fullDocument.a': 1
) and will ignore updates, inserts of other values of a
, or anything without the field a
.
const MongoClient = require('mongodb').MongoClient
const uri = 'mongodb://localhost:27017/test?replicaSet=replset'
const insert_pipeline = [
{$match: {operationType: 'insert', 'fullDocument.a': 1}}
]
function watch_insert(con, db, coll) {
console.log(new Date() + ' watching: ' + coll)
con.db(db).collection(coll).watch(insert_pipeline)
.on('change', data => {
console.log(data)
})
}
async function run() {
con = await MongoClient.connect(uri, {"useNewUrlParser": true})
watch_insert(con, 'test', 'test')
}
run()
Upvotes: 5