Reputation: 1451
I'm trying to use Highland.js for a database update script on a set of Mongoose models, it seems pretty perfect for a QueryStream call on a Model.find()
. I have some synchronous things to do (updating my model to conform to a new schema, a few cleanup operations), and at the end I want to save()
the document. I have some pre-save hooks configured which need to run, and the updates aren't really compatible with a straight Model.update()
. I've managed to get it sort-of working through a combination of Q.js and Highland:
var sender_stream = Sender.find({}).stream();
var promise_save = function(document) {
var deferred = Q.defer();
document.save(deferred.makeNodeResolver());
return _(deferred.promise);
}
var sender_deferred = Q.defer();
_(sender_stream).map(function(sender) {
// set some fields on sender...
return sender;
}).map(promise_save).series().on('done', sender_deferred.resolve).resume();
However, this doesn't seem to resolve the promise and I'm not sure if this is the "right" way to keep things nice and stream-y...it also seems weird to combine Q.js and Highland.js so intimately. Is there a better way?
Upvotes: 1
Views: 1690
Reputation: 5642
Instead of a Promise, you can use Highland's async function capabilities: http://highlandjs.org/#async. Mongoose also returns a Promise, so you could wrap that with Highland instead of the async function style but still avoid adding Q.
I would recommend using .flatMap()
instead of .map()
and .series()
to flatten those streams back into one document stream. Then adding .done()
can also be used to create a Thunk instead of using .resume()
combined with the 'done' event listener.
Honestly, not 100% sure why your are having issues with the 'done' event being called.
var sender_stream, set_fields, save, sender_deferred;
sender_stream = Sender.find({}).stream();
save = function save(document) {
return _(function(push, next) {
document.save(function(err, result) {
push(err, document);
push(null, _.nil);
});
});
};
set_fields = function setFields(sender) {
// set some fields on sender...
return sender;
};
sender_deferred = Q.defer();
_(sender_stream)
.map(setFields)
.flatMap(save)
.done(function() {
sender_deferred.resolve();
});
Upvotes: 1
Reputation: 6153
I don't know much about Q or Highland. But this seems like a straightforward use case for the transform function on querystreams.
var stream = Sender.find({}).stream({ transform: manipulate })
function manipulate(document) {
// do stuff here
return document;
}
stream.on("data", function(document) {
stream.pause()
document.save(function(error) {
// error handle, maybe stream.destroy([err]) if you want it to stop immediately
stream.resume();
});
});
stream.on("error", function(err){
//error handle
});
stream.on("close", function(){
console.log("hopefully this worked for you");
});
The transform function will run on the document prior to emitting the 'data' event. Once the transform function has done its stuff, it's return value is sent to the 'data' function. Then you just pause/save/resume.
Upvotes: 2