Reputation: 163568
I am trying to implement a stream with the new Node.js streams API that will buffer a certain amount of data. When this stream is piped to another stream, or if something consumes readable
events, this stream should flush its buffer and then simply become pass-through. The catch is, this stream will be piped to many other streams, and when each destination stream is attached, the buffer must be flushed even if it is already flushed to another stream.
For example:
BufferStream
implements stream.Transform
, and keeps a 512KB internal ring bufferReadableStreamA
is piped to an instance of BufferStream
BufferStream
writes to its ring buffer, reading data from ReadableStreamA
as it comes in. (It doesn't matter if data is lost, as the buffer overwrites old data.)BufferStream
is piped to WritableStreamB
WritableStreamB
receives the entire 512KB buffer, and continues to get data as it is written from ReadableStreamA
through BufferStream
.BufferStream
is piped to WritableStreamC
WritableStreamC
also receives the entire 512KB buffer, but this buffer is now different than what WritableStreamB
received, because more data has since been written to BufferStream
.Is this possible with the streams API? The only method I can think of would be to create an object with a method that spins up a new PassThrough stream for each destination, meaning I couldn't simply pipe to and from it.
For what it's worth, I've done this with the old "flowing" API by simply listening for new handlers on data
events. When a new function was attached with .on('data')
, I would call it directly with a copy of the ring buffer.
Upvotes: 21
Views: 5369
Reputation: 100320
Paul's answer is good, but I don't think it meets the exact requirements. It sounds like what needs to happen is that everytime pipe() is called on this transform stream, it needs to first flush the buffer that represents all the accumulation of data between time the transform stream was created/(connected to the source stream) and the time it was connected to the current writable/destination stream.
Something like this might be more correct:
var BufferStream = function () {
stream.Transform.apply(this, arguments);
this.buffer = []; //I guess an array will do
};
util.inherits(BufferStream, stream.Transform);
BufferStream.prototype._transform = function (chunk, encoding, done) {
this.push(chunk ? String(chunk) : null);
this.buffer.push(chunk ? String(chunk) : null);
done()
};
BufferStream.prototype.pipe = function (destination, options) {
var res = BufferStream.super_.prototype.pipe.apply(this, arguments);
this.buffer.forEach(function (b) {
res.write(String(b));
});
return res;
};
return new BufferStream();
I suppose this:
BufferStream.super_.prototype.pipe.apply(this, arguments);
is equivalent to this:
stream.Transform.prototype.pipe.apply(this, arguments);
You could probably optimize this and use some flags when pipe/unpipe are called.
Upvotes: 1
Reputation: 17048
Here's my take on your issue.
The basic idea is to create a Transform
stream, which will allow us to execute your custom buffering logic before sending the data on the output of the stream:
var util = require('util')
var stream = require('stream')
var BufferStream = function (streamOptions) {
stream.Transform.call(this, streamOptions)
this.buffer = new Buffer('')
}
util.inherits(BufferStream, stream.Transform)
BufferStream.prototype._transform = function (chunk, encoding, done) {
// custom buffering logic
// ie. add chunk to this.buffer, check buffer size, etc.
this.buffer = new Buffer(chunk)
this.push(chunk)
done()
}
Then, we need to override the .pipe()
method so that we are are notified when the BufferStream
is piped into a stream, which allows us to automatically write data to it:
BufferStream.prototype.pipe = function (destination, options) {
var res = BufferStream.super_.prototype.pipe.call(this, destination, options)
res.write(this.buffer)
return res
}
In this way, when we write buffer.pipe(someStream)
, we perform the pipe as intended and write the internal buffer to the output stream. After that, the Transform
class takes care of everything, while keeping track of the backpressure and whatnot.
Here is a working gist. Please note that I didn't bother writing a correct buffering logic (ie. I don't care about the size of the internal buffer), but this should be easy to fix.
Upvotes: 8