Reputation: 116
I'm trying to design a workflow using Highland.js. I have not be able to figure out how Highland.js can used for it.
I have a stream based workflow as below (pseudo code),
read //fs.createReadStream(...)
.pipe(parse) //JSONStream.parse(...)
.pipe(filterDuplicate) //mongoClient.db.collection.count({}) > 0
.pipe(transform) //fn(item) { return tranform(item); }
.pipe(write); //mongoClient.db.collection.insert(doc)
The filterDuplicate looks up the database to check if read record exists (using a condition) and returns a boolean result. For the filter to work, it needs an active DB connection, which I want to reuse till the stream is complete. One way is have an open a connection before the read and close on 'finish' event of write; This means I need to pass the connection as a param to filter and write, which would work if both methods use the same database.
In the above workflow, filterDuplicate and write may also use different databases. So I would expect the connection to contained and managed with-in each function, which makes it an self-contained reusable unit.
I'm looking for any inputs on how this can be designed using Highland.
Thanks.
Upvotes: 0
Views: 182
Reputation: 453
It's not going to be quite as easy as just using pipe
a bunch of times. You've got to use the most appropriate API method for the task.
Here's a rough example of what you're probably going to end up close to:
read
.through(JSONStream.parse([true]))
.through((x) => {
h((next, push) => { // use a generator for async operations
h.wrapCallback( mongoCountQuery )( params ) // you don't have to do it this way
.collect()
.toCallback((err, result) => {
if ( result > 0 ) push( err, x ); // if it met the criteria, hold onto it
return push( null, h.nil ); // tell highland this stream is done
});
});
})
.merge() // because you've got a stream of streams after that `through`
.map(transform) // just your standard map through a transform
.through((x) => {
h((next, push) => { // another generator for async operations
h.wrapCallback( mongoUpdateQuery )( params )
.toCallback((err, results) => {
push( err, results );
return push( null, h.nil );
});
});
})
.merge() // another stream-of-streams situation
.toCallback( cb ); // call home to say we're done
Upvotes: 0