Reputation: 61
I'm trying to setup a websocket server with a stream of a tailable cursor. I have a capped collection and have tried using mongo 3.2 and mongo 2.6 to do this so far.
This is the stream I've been trying to get work. It will run once and get all the documents but won't get new documents that I insert. I've seen this code used in countless examples and seen someone stream a chat server but can't seem to get it working for myself.
function startStream(){
const stream = collections.MQS.find({}, {tailable:true, awaitdata:true, numberOfRetries:-1}).stream();
stream.on('data', (doc)=>{
console.log(doc);
});
}
I call this function in the connection:
Mongo.MongoClient.connect (mongodbUri, onConnected);
function onConnected(err, database){
if (err) {throw err;}
// Assign db and collection letiables
collections.db = database;
collections.MQS = database.collection('mqs');
console.log("Connected to: " + mongodbUri);
startStream();
}
Is there a more appropriate way to do this that I can't find?
If this query has to be recursive what is the point of having tailable options vs managing time stamps on a normal find query?
Upvotes: 4
Views: 1993
Reputation: 7516
It seems that you are not sorting the stream, so I guess you are just tailing the start of the collection: this will not change much unless it reaches the cap of the collection.
Try to add a natural sort in you stream:
collections.MQS.find({}, {tailable:true, awaitdata:true, numberOfRetries:-1}).sort({$natural: -1}).stream();
Latest syntax from the docs:
collections.MQS.find({}).addCursorFlag('tailable', true).stream();
http://mongodb.github.io/node-mongodb-native/2.1/api/Cursor.html#stream
Upvotes: 1
Reputation: 48396
Here is one example shows how to set up subscriber
function for new MongoDB documents through cursor
, and print new documents in console.
/** * How to subscribe for new MongoDB documents in Node.js using tailable cursor */ // subscriber function var subscribe = function(){ var args = [].slice.call(arguments); var next = args.pop(); var filter = args.shift() || {}; if('function' !== typeof next) throw('Callback function not defined'); // connect to MongoDB require('mongodb').MongoClient.connect('mongodb://localhost/test', function(err, db){ // make sure you have created capped collection "messages" on db "test" db.collection('messages', function(err, coll) { // seek to latest object var seekCursor = coll.find(filter).sort({$natural: -1}).limit(1); seekCursor.nextObject(function(err, latest) { if (latest) { filter._id = { $gt: latest._id } } // set MongoDB cursor options var cursorOptions = { tailable: true, awaitdata: true, numberOfRetries: -1 }; // create stream and listen var stream = coll.find(filter, cursorOptions).sort({$natural: -1}).stream(); // call the callback stream.on('data', next); }); }); }); }; // new documents will appear in the console subscribe( function(document) { console.log(document); });
Upvotes: 0