brandonscript
brandonscript

Reputation: 73024

How do I write a Node.js module to handle an incoming piped stream

I'm trying to write a node module that accepts an incoming piped binary (or base-64-encoded) stream, but frankly I don't even know where to start. I can't see any examples in the Node docs about handling incoming streams; I only see examples on consuming them?

Say for example I want to be able to do this:

var asset = new ProjectAsset('myFile', __dirname + '/image.jpg')
var stream = fs.createReadStream(__dirname + '/image.jpg', { encoding: 'base64' }).pipe(asset)
stream.on('finish', function() {
    done()
})

I've gotten ProjectAsset looking like this, but I'm at a loss of where to go next:

'use strict'

var stream = require('stream'),
    util = require('util')

var ProjectAsset = function() {
    var self = this

    Object.defineProperty(self, 'binaryData', {
        configurable: true,
        writable: true
    })

    stream.Stream.call(self)

    self.on('pipe', function(src) {
        // does it happen here? how do I set self.binaryData?
    })

    return self
}

util.inherits(ProjectAsset, stream.Stream)

module.exports = ProjectAsset
module.exports.DEFAULT_FILE_NAME = 'file'

Upvotes: 4

Views: 866

Answers (3)

saintedlama
saintedlama

Reputation: 6898

Since your using var asset = new ProjectAsset('myFile', __dirname + '/image.jpg') I suppose your ProjectAsset responsibility is to take some input stream do some transformations and write that to a file. You could implement a transform stream because you receive some input from a stream and generate some output of it that can be saved to a file or to some other write stream.

You could of course implement a transform stream by inheriting from node.js Transform Stream but inheriting is quite cumbersome so my implementation uses through2 to implement the transform stream:

module.exports = through2(function (chunk, enc, callback) {
  // This function is called whenever a piece of data from the incoming stream is read
  // Transform the chunk or buffer the chunk in case you need more data to transform

  // Emit a data package to the next stream in the pipe or omit this call if you need more data from the input stream to be read
  this.push(chunk);

  // Signal through2 that you processed the incoming data package
  callback();
 }))

Usage

var stream = fs.createReadStream(__dirname + '/image.jpg', { encoding: 'base64' })
               .pipe(projectAsset)
               .pipe(fs.createWriteStream(__dirname + '/image.jpg'));

As you can see in this example implementing a stream pipeline fully decouples data transformation and saving of the data.

Factory Function

If you prefer to use a constructor like approach in the project asset module because you need to pass some values or things you could easily export a constructor function as shown below

var through2 = require('through2');

module.exports = function(someData) {

  // New stream is returned that can use someData argument for doing things
  return through2(function (chunk, enc, callback) {
    // This function is called whenever a piece of data from the incoming stream is read
    // Transform the chunk or buffer the chunk in case you need more data to transform

    // Emit a data package to the next stream in the pipe or omit this call if you need more data from the input stream to be read
    this.push(chunk);

    // Signal through2 that you processed the incoming data package
    callback();
  });
}

Usage

var stream = fs.createReadStream(__dirname + '/image.jpg', { encoding: 'base64' })
               .pipe(projectAsset({ foo: 'bar' }))
               .pipe(fs.createWriteStream(__dirname + '/image.jpg'));

Upvotes: 2

pohlman
pohlman

Reputation: 331

It is possible to inherit from stream.Stream and make it work, however based on what's available in the documentation I would suggest inheriting from stream.Writable. Piping into a stream.Writable you'll need to have _write(chunk, encoding, done) defined to handle the piping. Here is an example:

var asset = new ProjectAsset('myFile', __dirname + '/image.jpg')
var stream = fs.createReadStream(__dirname + '/image.jpg', { encoding: 'base64' }).pipe(asset)
stream.on('finish', function() {
    console.log(asset.binaryData);
})

Project Asset

'use strict'

var stream = require('stream'),
    util = require('util')

var ProjectAsset = function() {
    var self = this

    self.data
    self.binaryData = [];

    stream.Writable.call(self)

    self._write = function(chunk, encoding, done) {
        // Can handle this data however you want
        self.binaryData.push(chunk.toString())
        // Call after processing data
        done()
    }
    self.on('finish', function() {
        self.data = Buffer.concat(self.binaryData)
    })

    return self
}

util.inherits(ProjectAsset, stream.Writable)

module.exports = ProjectAsset
module.exports.DEFAULT_FILE_NAME = 'file'

If you're looking to also read from the stream, take a look at inheriting from stream.Duplex and also including the _read(size) method.

There's also the simplified constructors api if you're doing something simpler.

Upvotes: 4

Binvention
Binvention

Reputation: 1077

Im not sure if this is exaclty what you were looking for but i think you could handle it using the buffer api with Buffer.concat on an array of buffers that can be retrieved form chunk on the stream data listener

'use strict'

var stream = require('stream'),
    util = require('util');

var ProjectAsset = function() {
    var self = this

    Object.defineProperty(self, 'binaryData', {
        configurable: true,
        writable: true
    })

    stream.Stream.call(self)
    var data;
    var dataBuffer=[];
    self.on('data', function(chunk) {
        dataBuffer.push(chunk);
    }).on('end',function(){
        data=Buffer.concat(dataBuffer);
    });
    self.binaryData=data.toString('binary');
    return self
}

util.inherits(ProjectAsset, stream.Stream)

module.exports = ProjectAsset
module.exports.DEFAULT_FILE_NAME = 'file'

Upvotes: 2

Related Questions