Krishnan
Krishnan

Reputation: 116

How to write a Filter ( using DB ) with Highland.js

I'm trying to design a workflow using Highland.js. I have not be able to figure out how Highland.js can used for it.

I have a stream based workflow as below (pseudo code),

read                      //fs.createReadStream(...)
   .pipe(parse)           //JSONStream.parse(...)
   .pipe(filterDuplicate) //mongoClient.db.collection.count({}) > 0
   .pipe(transform)       //fn(item) { return tranform(item); }
   .pipe(write);          //mongoClient.db.collection.insert(doc)

The filterDuplicate looks up the database to check if read record exists (using a condition) and returns a boolean result. For the filter to work, it needs an active DB connection, which I want to reuse till the stream is complete. One way is have an open a connection before the read and close on 'finish' event of write; This means I need to pass the connection as a param to filter and write, which would work if both methods use the same database.

In the above workflow, filterDuplicate and write may also use different databases. So I would expect the connection to contained and managed with-in each function, which makes it an self-contained reusable unit.

I'm looking for any inputs on how this can be designed using Highland.

Thanks.

Upvotes: 0

Views: 182

Answers (1)

amsross
amsross

Reputation: 453

It's not going to be quite as easy as just using pipe a bunch of times. You've got to use the most appropriate API method for the task.

Here's a rough example of what you're probably going to end up close to:

read
  .through(JSONStream.parse([true]))
  .through((x) => {
    h((next, push) => { // use a generator for async operations
      h.wrapCallback( mongoCountQuery )( params ) // you don't have to do it this way
        .collect()
        .toCallback((err, result) => {
          if ( result > 0 ) push( err, x ); // if it met the criteria, hold onto it
          return push( null, h.nil ); // tell highland this stream is done
        });
    });
  })
  .merge() // because you've got a stream of streams after that `through`
  .map(transform) // just your standard map through a transform
  .through((x) => {
    h((next, push) => { // another generator for async operations
      h.wrapCallback( mongoUpdateQuery )( params )
        .toCallback((err, results) => {
          push( err, results );
          return push( null, h.nil );
        });
    });
  })
  .merge() // another stream-of-streams situation
  .toCallback( cb ); // call home to say we're done

Upvotes: 0

Related Questions