Don
Don

Reputation: 61

NodeJS MongoDB Recurring Tailable Cursor

I'm trying to setup a websocket server with a stream of a tailable cursor. I have a capped collection and have tried using mongo 3.2 and mongo 2.6 to do this so far.

This is the stream I've been trying to get work. It will run once and get all the documents but won't get new documents that I insert. I've seen this code used in countless examples and seen someone stream a chat server but can't seem to get it working for myself.

function startStream(){
const stream = collections.MQS.find({}, {tailable:true, awaitdata:true, numberOfRetries:-1}).stream();

stream.on('data', (doc)=>{
   console.log(doc); 
});    
}

I call this function in the connection:

Mongo.MongoClient.connect (mongodbUri, onConnected);
function onConnected(err, database){
if (err) {throw err;}

// Assign db and collection letiables
collections.db = database;
collections.MQS = database.collection('mqs');

console.log("Connected to: " + mongodbUri);
startStream();
}

Is there a more appropriate way to do this that I can't find?

If this query has to be recursive what is the point of having tailable options vs managing time stamps on a normal find query?

Upvotes: 4

Views: 1993

Answers (2)

Jonathan Muller
Jonathan Muller

Reputation: 7516

It seems that you are not sorting the stream, so I guess you are just tailing the start of the collection: this will not change much unless it reaches the cap of the collection.

Try to add a natural sort in you stream:

collections.MQS.find({}, {tailable:true, awaitdata:true, numberOfRetries:-1}).sort({$natural: -1}).stream();

Latest syntax from the docs:

collections.MQS.find({}).addCursorFlag('tailable', true).stream();

http://mongodb.github.io/node-mongodb-native/2.1/api/Cursor.html#stream

Upvotes: 1

zangw
zangw

Reputation: 48396

Here is one example shows how to set up subscriber function for new MongoDB documents through cursor, and print new documents in console.

/**
 * How to subscribe for new MongoDB documents in Node.js using tailable cursor
 */

// subscriber function
var subscribe = function(){  
  var args = [].slice.call(arguments);
  var next = args.pop();
  var filter = args.shift() || {};

  if('function' !== typeof next) throw('Callback function not defined');

 // connect to MongoDB
  require('mongodb').MongoClient.connect('mongodb://localhost/test', function(err, db){

   // make sure you have created capped collection "messages" on db "test"
    db.collection('messages', function(err, coll) {

     // seek to latest object
      var seekCursor = coll.find(filter).sort({$natural: -1}).limit(1);
      seekCursor.nextObject(function(err, latest) {
        if (latest) {
          filter._id = { $gt: latest._id }
        }

       // set MongoDB cursor options
        var cursorOptions = {
          tailable: true,
          awaitdata: true,
          numberOfRetries: -1
        };

       // create stream and listen
        var stream = coll.find(filter, cursorOptions).sort({$natural: -1}).stream();

       // call the callback
        stream.on('data', next);
      });
    });   
  });     
};

// new documents will appear in the console
subscribe( function(document) {
  console.log(document);  
});

Upvotes: 0

Related Questions