Reputation: 692
I have an infinite data stream from a forked process. I want this stream to be processed by a module and sometimes I want to duplicate the data from this stream to be processed by a different module (e.g. monitoring a data stream but if anything interesting happens I want to log the next n bytes to file for further investigation).
So let's suppose the following scenario:
Here is a code snippet for this:
var stream = process.stdout;
stream.pipe(detector); // Using the first consumer
function startAnotherConsumer() {
stream2 = new PassThrough();
stream.pipe(stream2);
// use stream2 somewhere else
}
function stopAnotherConsumer() {
stream.unpipe(stream2);
}
My problem here is that unpiping the stream2 doesn't get it closed. If I call stream.end()
after the unpipe
command, then it crashes with the error:
events.js:160
throw er; // Unhandled 'error' event
^
Error: write after end
at writeAfterEnd (_stream_writable.js:192:12)
at PassThrough.Writable.write (_stream_writable.js:243:5)
at Socket.ondata (_stream_readable.js:555:20)
at emitOne (events.js:101:20)
at Socket.emit (events.js:188:7)
at readableAddChunk (_stream_readable.js:176:18)
at Socket.Readable.push (_stream_readable.js:134:10)
at Pipe.onread (net.js:548:20)
I even tried to pause the source stream to help the buffer to be flushed from the second stream but it didn't work either:
function stopAnotherConsumer() {
stream.pause();
stream2.once('unpipe', function () {
stream.resume();
stream2.end();
});
stream.unpipe(stream2);
}
Same error as before here (write after end).
How to solve the problem? My original intent is to duplicate the streamed data from one point, then close the second stream after a while.
Note: I tried to use this answer to make it work.
Upvotes: 0
Views: 1395
Reputation: 692
As there were no answers, I post my (patchwork) solution. In case anyone'd have a better one, don't hold it back.
A new Stream:
const Writable = require('stream').Writable;
const Transform = require('stream').Transform;
class DuplicatorStream extends Transform {
constructor(options) {
super(options);
this.otherStream = null;
}
attachStream(stream) {
if (!stream instanceof Writable) {
throw new Error('DuplicatorStream argument is not a writeable stream!');
}
if (this.otherStream) {
throw new Error('A stream is already attached!');
}
this.otherStream = stream;
this.emit('attach', stream);
}
detachStream() {
if (!this.otherStream) {
throw new Error('No stream to detach!');
}
let stream = this.otherStream;
this.otherStream = null;
this.emit('detach', stream);
}
_transform(chunk, encoding, callback) {
if (this.otherStream) {
this.otherStream.write(chunk);
}
callback(null, chunk);
}
}
module.exports = DuplicatorStream;
And the usage:
var stream = process.stdout;
var stream2;
duplicatorStream = new DuplicatorStream();
stream.pipe(duplicatorStream); // Inserting my duplicator stream in the chain
duplicatorStream.pipe(detector); // Using the first consumer
function startAnotherConsumer() {
stream2 = new stream.PassThrough();
duplicatorStream.attachStream(stream2);
// use stream2 somewhere else
}
function stopAnotherConsumer() {
duplicatorStream.once('detach', function () {
stream2.end();
});
duplicatorStream.detachStream();
}
Upvotes: 1