AdamPat
AdamPat

Reputation: 101

Node - Abstracting Pipe Steps into Function

I'm familiar with Node streams, but I'm struggling on best practices for abstracting code that I reuse a lot into a single pipe step.

Here's a stripped down version of what I'm writing today:

inputStream
.pipe(csv.parse({columns:true})
.pipe(csv.transform(function(row) {return transform(row); }))
.pipe(csv.stringify({header: true})
.pipe(outputStream);

The actual work happens in transform(). The only things that really change are inputStream, transform(), and outputStream. Like I said, this is a stripped down version of what I actually use. I have a lot of error handling and logging on each pipe step, which is ultimately why I'm try to abstract the code.

What I'm looking to write is a single pipe step, like so:

inputStream
.pipe(csvFunction(transform(row)))
.pipe(outputStream);

What I'm struggling to understand is how to turn those pipe steps into a single function that accepts a stream and returns a stream. I've looked at libraries like through2 but I'm but not sure how that get's me to where I'm trying to go.

Upvotes: 0

Views: 537

Answers (2)

AdamPat
AdamPat

Reputation: 101

Here's what I ended up going with. I used the through2 library and the streaming API of the csv library to create the pipe function I was looking for.

var csv = require('csv');
    through = require('through2');

module.exports = function(transformFunc) {
    parser = csv.parse({columns:true, relax_column_count:true}),
    transformer = csv.transform(function(row) {
        return transformFunc(row);
    }),
    stringifier = csv.stringify({header: true});

    return through(function(chunk,enc,cb){
        var stream = this;

            parser.on('data', function(data){
                transformer.write(data);
            });

            transformer.on('data', function(data){
                stringifier.write(data);
            });

            stringifier.on('data', function(data){
                stream.push(data);
            });

            parser.write(chunk);

            parser.removeAllListeners('data');
            transformer.removeAllListeners('data');
            stringifier.removeAllListeners('data');
            cb();
    })
}

It's worth noting the part where I remove the event listeners towards the end, this was due to running into memory errors where I had created too many event listeners. I initially tried solving this problem by listening to events with once, but that prevented subsequent chunks from being read and passed on to the next pipe step.

Let me know if anyone has feedback or additional ideas.

Upvotes: 0

Marc
Marc

Reputation: 1974

You can use the PassThrough class like this:

var PassThrough = require('stream').PassThrough;

var csvStream = new PassThrough();
csvStream.on('pipe', function (source) {
  // undo piping of source
  source.unpipe(this);
  // build own pipe-line and store internally
  this.combinedStream =
    source.pipe(csv.parse({columns: true}))
      .pipe(csv.transform(function (row) {
        return transform(row);
      }))
      .pipe(csv.stringify({header: true}));
});

csvStream.pipe = function (dest, options) {
  // pipe internal combined stream to dest
  return this.combinedStream.pipe(dest, options);
};

inputStream
  .pipe(csvStream)
  .pipe(outputStream);

Upvotes: 2

Related Questions