Reputation: 100190
I spawn a child process like so:
const n = cp.spawn('bash');
n.stdout.pipe(process.stdout);
n.stderr.pipe(process.stderr);
I am looking for a transform stream so that I can prepend something like '[child process]' to the beginning of each line from the child, so I know that the stdio is coming from the child versus the parent process.
So it would look like:
const getTransformPrepender = function() : Transform {
return ...
}
n.stdout.pipe(getTransformPrepender('[child]')).pipe(process.stdout);
n.stderr.pipe(getTransformPrepender('[child]')).pipe(process.stderr);
does anyone know if there is an existing transform package like this or how to write one?
I have this:
import * as stream from 'stream';
export default function(pre: string){
let saved = '';
return new stream.Transform({
transform(chunk, encoding, cb) {
cb(null, String(pre) + String(chunk));
},
flush(cb) {
this.push(saved);
cb();
}
});
}
but I am afraid it won't work in edge cases - where one chunk burst may not comprise an entire line (for very long lines).
It looks like the answer to this is here: https://strongloop.com/strongblog/practical-examples-of-the-new-node-js-streams-api/
but with this addendum: https://twitter.com/the1mills/status/886340747275812865
Upvotes: 7
Views: 3154
Reputation: 2190
Pretty unsatisfied with the other two answers as they keep internal buffers while waiting for the line to finish, then adding the prefix. When we have a stream of characters and some pretty simple rules for when to insert the prefix, we might aswell make it unbuffered.
This is a variant that doesn't need a call to flush(), which may be useful if you are overriding stdout/stderr
.write
for subprocesses.
const prefixStreamUnbuffered = (prefix: string) => {
const prefixBuf = Buffer.from(prefix)
let endedAtLinebreak = true // Initial write adds the prefix.
return {
transform(chunk: Buffer, _encoding?: BufferEncoding) {
if (chunk.length === 0) return chunk
const EOL = Buffer.from("\n")
const startedAfterLinebreak = endedAtLinebreak
const lines: Buffer[] = []
let rest = chunk
let index: number
while ((index = rest.indexOf("\n")) !== -1) {
lines.push(rest.slice(0, index++))
rest = rest.slice(index)
}
endedAtLinebreak = rest.length === 0
if (!endedAtLinebreak) lines.push(rest)
const output = startedAfterLinebreak ? [prefixBuf] : []
lines.forEach((x, idx) => {
const lineParts = idx ? [EOL, prefixBuf, x] : [x]
output.push(Buffer.concat(lineParts))
})
if (endedAtLinebreak) output.push(EOL)
return Buffer.concat(output)
},
}
}
// Tests:
const testTransform1 = prefixStreamUnbuffered("[prefix] ")
console.log(
"testTransform1",
testTransform1.transform(Buffer.from("hello1\nhello2\n")).toString()
)
const testTransform2 = prefixStreamUnbuffered("[prefix] ")
console.log(
"testTransform2",
Buffer.concat([
testTransform2.transform(Buffer.from("hello1\n")),
testTransform2.transform(Buffer.from("hello2\n")),
testTransform2.transform(Buffer.from("")),
// Empty strings are 'probably' not valid inputs to a stream, so we shouldn't
// need to handle the `chunk.length === 0` case, but we might aswell do it.
]).toString()
)
const testTransform3 = prefixStreamUnbuffered("[prefix] ")
console.log(
"testTransform3",
Buffer.concat([
testTransform3.transform(Buffer.from("hello1")),
testTransform3.transform(Buffer.from("\nhello2")),
testTransform3.transform(Buffer.from("\n")),
]).toString()
)
Outputs:
testTransform1 [prefix] hello1
[prefix] hello2
testTransform2 [prefix] hello1
[prefix] hello2
testTransform3 [prefix] hello1
[prefix] hello2
Upvotes: 1
Reputation: 100190
You can prepend to a stream using:
https://github.com/ORESoftware/prepend-transform
but it's designed to solve the problem at hand like so:
import pt from 'prepend-transform';
import * as cp from 'child_process';
const n = cp.spawn('bash');
n.stdout.pipe(pt('child stdout: ')).pipe(process.stdout);
n.stderr.pipe(pt('child stderr: ')).pipe(process.stderr);
Upvotes: 7
Reputation: 12129
There are in total three cases that you need to correctly handle:
Here is an algorithm description to solve all three situations
And here is an actual implementation with descriptions of why it is needed etc.
Please note that for performance reasons, I am not converting the Buffers into classic JS strings.
const { Transform } = require('stream')
const prefix = Buffer.from('[worker]: ')
const prepender = new Transform({
transform(chunk, encoding, done) {
this._rest = this._rest && this._rest.length
? Buffer.concat([this._rest, chunk])
: chunk
let index
// As long as we keep finding newlines, keep making slices of the buffer and push them to the
// readable side of the transform stream
while ((index = this._rest.indexOf('\n')) !== -1) {
// The `end` parameter is non-inclusive, so increase it to include the newline we found
const line = this._rest.slice(0, ++index)
// `start` is inclusive, but we are already one char ahead of the newline -> all good
this._rest = this._rest.slice(index)
// We have a single line here! Prepend the string we want
this.push(Buffer.concat([prefix, line]))
}
return void done()
},
// Called before the end of the input so we can handle any remaining
// data that we have saved
flush(done) {
// If we have any remaining data in the cache, send it out
if (this._rest && this._rest.length) {
return void done(null, Buffer.concat([prefix, this._rest])
}
},
})
process.stdin.pipe(prepender).pipe(process.stdout)
Upvotes: 10