Chris Vandevelde
Chris Vandevelde

Reputation: 1451

Asynchronous transforms on streams in Highland.js

I'm trying to use Highland.js for a database update script on a set of Mongoose models, it seems pretty perfect for a QueryStream call on a Model.find(). I have some synchronous things to do (updating my model to conform to a new schema, a few cleanup operations), and at the end I want to save() the document. I have some pre-save hooks configured which need to run, and the updates aren't really compatible with a straight Model.update(). I've managed to get it sort-of working through a combination of Q.js and Highland:

var sender_stream = Sender.find({}).stream();
var promise_save = function(document) {
    var deferred = Q.defer();
    document.save(deferred.makeNodeResolver());
    return _(deferred.promise);
}

var sender_deferred = Q.defer();
_(sender_stream).map(function(sender) {
    // set some fields on sender...
    return sender;
}).map(promise_save).series().on('done', sender_deferred.resolve).resume();

However, this doesn't seem to resolve the promise and I'm not sure if this is the "right" way to keep things nice and stream-y...it also seems weird to combine Q.js and Highland.js so intimately. Is there a better way?

Upvotes: 1

Views: 1690

Answers (2)

rmarscher
rmarscher

Reputation: 5642

Instead of a Promise, you can use Highland's async function capabilities: http://highlandjs.org/#async. Mongoose also returns a Promise, so you could wrap that with Highland instead of the async function style but still avoid adding Q.

I would recommend using .flatMap() instead of .map() and .series() to flatten those streams back into one document stream. Then adding .done() can also be used to create a Thunk instead of using .resume() combined with the 'done' event listener.

Honestly, not 100% sure why your are having issues with the 'done' event being called.

var sender_stream, set_fields, save, sender_deferred;

sender_stream = Sender.find({}).stream();

save = function save(document) {
    return _(function(push, next) {
        document.save(function(err, result) {
            push(err, document);
            push(null, _.nil);
        });
    });
};

set_fields = function setFields(sender) {
    // set some fields on sender...
    return sender;
};

sender_deferred = Q.defer();

_(sender_stream)
    .map(setFields)
    .flatMap(save)
    .done(function() {
        sender_deferred.resolve();
    });

Upvotes: 1

chrisbajorin
chrisbajorin

Reputation: 6153

I don't know much about Q or Highland. But this seems like a straightforward use case for the transform function on querystreams.

var stream = Sender.find({}).stream({ transform: manipulate })

function manipulate(document) {
    // do stuff here
    return document;
}

stream.on("data", function(document) {
    stream.pause()
    document.save(function(error) {
        // error handle, maybe stream.destroy([err]) if you want it to stop immediately
        stream.resume();
    });
});

stream.on("error", function(err){
    //error handle
});

stream.on("close", function(){
    console.log("hopefully this worked for you");
});

The transform function will run on the document prior to emitting the 'data' event. Once the transform function has done its stuff, it's return value is sent to the 'data' function. Then you just pause/save/resume.

Upvotes: 2

Related Questions