Brad
Brad

Reputation: 163568

Implementing a buffered transform stream

I am trying to implement a stream with the new Node.js streams API that will buffer a certain amount of data. When this stream is piped to another stream, or if something consumes readable events, this stream should flush its buffer and then simply become pass-through. The catch is, this stream will be piped to many other streams, and when each destination stream is attached, the buffer must be flushed even if it is already flushed to another stream.

For example:

  1. BufferStream implements stream.Transform, and keeps a 512KB internal ring buffer
  2. ReadableStreamA is piped to an instance of BufferStream
  3. BufferStream writes to its ring buffer, reading data from ReadableStreamA as it comes in. (It doesn't matter if data is lost, as the buffer overwrites old data.)
  4. BufferStream is piped to WritableStreamB
  5. WritableStreamB receives the entire 512KB buffer, and continues to get data as it is written from ReadableStreamA through BufferStream.
  6. BufferStream is piped to WritableStreamC
  7. WritableStreamC also receives the entire 512KB buffer, but this buffer is now different than what WritableStreamB received, because more data has since been written to BufferStream.

Is this possible with the streams API? The only method I can think of would be to create an object with a method that spins up a new PassThrough stream for each destination, meaning I couldn't simply pipe to and from it.

For what it's worth, I've done this with the old "flowing" API by simply listening for new handlers on data events. When a new function was attached with .on('data'), I would call it directly with a copy of the ring buffer.

Upvotes: 21

Views: 5369

Answers (2)

Alexander Mills
Alexander Mills

Reputation: 100320

Paul's answer is good, but I don't think it meets the exact requirements. It sounds like what needs to happen is that everytime pipe() is called on this transform stream, it needs to first flush the buffer that represents all the accumulation of data between time the transform stream was created/(connected to the source stream) and the time it was connected to the current writable/destination stream.

Something like this might be more correct:

  var BufferStream = function () {
        stream.Transform.apply(this, arguments);
        this.buffer = []; //I guess an array will do
    };

    util.inherits(BufferStream, stream.Transform);

    BufferStream.prototype._transform = function (chunk, encoding, done) {

        this.push(chunk ? String(chunk) : null);
        this.buffer.push(chunk ? String(chunk) : null);

        done()
    };

    BufferStream.prototype.pipe = function (destination, options) {
        var res = BufferStream.super_.prototype.pipe.apply(this, arguments);
        this.buffer.forEach(function (b) {
            res.write(String(b));
        });
        return res;
    };


    return new BufferStream();

I suppose this:

BufferStream.super_.prototype.pipe.apply(this, arguments);

is equivalent to this:

stream.Transform.prototype.pipe.apply(this, arguments);

You could probably optimize this and use some flags when pipe/unpipe are called.

Upvotes: 1

Paul Mougel
Paul Mougel

Reputation: 17048

Here's my take on your issue.

The basic idea is to create a Transform stream, which will allow us to execute your custom buffering logic before sending the data on the output of the stream:

var util = require('util')
var stream = require('stream')

var BufferStream = function (streamOptions) {
  stream.Transform.call(this, streamOptions)
  this.buffer = new Buffer('')
}

util.inherits(BufferStream, stream.Transform)

BufferStream.prototype._transform = function (chunk, encoding, done) {
  // custom buffering logic
  // ie. add chunk to this.buffer, check buffer size, etc.
  this.buffer = new Buffer(chunk)

  this.push(chunk)
  done()
}

Then, we need to override the .pipe() method so that we are are notified when the BufferStream is piped into a stream, which allows us to automatically write data to it:

BufferStream.prototype.pipe = function (destination, options) {
  var res = BufferStream.super_.prototype.pipe.call(this, destination, options)
  res.write(this.buffer)
  return res
}

In this way, when we write buffer.pipe(someStream), we perform the pipe as intended and write the internal buffer to the output stream. After that, the Transform class takes care of everything, while keeping track of the backpressure and whatnot.

Here is a working gist. Please note that I didn't bother writing a correct buffering logic (ie. I don't care about the size of the internal buffer), but this should be easy to fix.

Upvotes: 8

Related Questions