Linda
Linda

Reputation: 119

Watch document creation with MongoDB Streams

I'd like to use MongoDB streams to trigger an event each time a specific kind of data is inserted into a collection.

I've already found something roughly similar to what I'm looking for but that's only working for change streams and not insertions.

Any idea of how I can get that done?

I'm using Mongodb driver with Nodejs to get that done so my code would be something like that:

const MongoClient = require('mongodb').MongoClient;

const uri = 'mongodb://localhost:27017/?replicaSet=rs0';
MongoClient.connect(uri, function(err, client) {

const db = client.db('mydb');

// Connect using MongoClient
var filter = [{
    $match: {
        $or: [
            { $or: [{"receipt.receiver": "newdexpocket"}, {"act.account": "newdexpocket"}] }]
    }
}];

var options = { fullDocument: 'updateLookup' };
db.collection('somecollection').watch(filter, options).on('create', data => 
  {
    console.log(data);
  });
});

Sorry about all these questions but I'm struggling to find some answers in the official doc.

SOLUTION:

Beware not to forget fullDocument in your request ;-)

function watch_insert(con, db, coll) {
  console.log(new Date() + ' watching: ' + coll);

  const insert_pipeline = [ { $match:
                    { 
                        operationType: 'insert',
                        $or: [
                            { "fullDocument.receipt.receiver": "newdexpocket" },
                            { "fullDocument.act.account": "newdexpocket" }
                        ]
                    }
                }];

  con.db(db).collection(coll).watch(insert_pipeline)
    .on('change', data => {
      console.log(data)
    });
}

async function run(uri) {
    try {
        con = await MongoClient.connect(uri, {"useNewUrlParser": true});
        watch_insert(con, 'EOS', 'action_traces');
    } catch (err) {
        console.log(err);
    }
}

Upvotes: 2

Views: 3745

Answers (2)

iGroza
iGroza

Reputation: 919

Solution for macOS users, using homebrew

You can adapt it to your variant by looking for a solution to some points

  1. Stop mongo service: brew services stop mongodb-community

  2. Edit /opt/homebrew/etc/mongod.conf

  3. Add this line:

    replication:
       replSetName: "rs0"
  1. Save file and open mongo shell: mongo

  2. Run this command: rs.initiate()

  3. Exit from mongo shell: exit

  4. start mongo services: brew services start mongodb-community

  5. In your mongo client configuration, add this option: replicaSet=rs0

And "The $changeStream stage is only supported on replica sets because I'm not using a replicaSet" error will go away!

Upvotes: 0

kevinadi
kevinadi

Reputation: 13775

You would need to:

  1. Specify operationType: 'insert'. Since you don't want to monitor updates, you don't need updateLookup.
  2. Create a proper aggregation pipeline for your filter that includes operationType.
  3. The aggregation pipeline filters documents that is returned by watch(). An example output is in the Change Events page.

watch() returns a ChangeStream. It fires close, change, end, and error events. See ChangeStream for more details.

Here's a full example of a changestream that listens on insert operation on the database test collection test. It will output documents that has the field {a: 1} ('fullDocument.a': 1) and will ignore updates, inserts of other values of a, or anything without the field a.

const MongoClient = require('mongodb').MongoClient
const uri = 'mongodb://localhost:27017/test?replicaSet=replset'

const insert_pipeline = [
  {$match: {operationType: 'insert', 'fullDocument.a': 1}}
]

function watch_insert(con, db, coll) {
  console.log(new Date() + ' watching: ' + coll)
  con.db(db).collection(coll).watch(insert_pipeline)
    .on('change', data => {
      console.log(data)
    })
}

async function run() {
  con = await MongoClient.connect(uri, {"useNewUrlParser": true})
  watch_insert(con, 'test', 'test')
}

run()

Upvotes: 5

Related Questions