Alexander Mills
Alexander Mills

Reputation: 100190

Transform stream to prepend string to each line

I spawn a child process like so:

const n = cp.spawn('bash');

n.stdout.pipe(process.stdout);
n.stderr.pipe(process.stderr);

I am looking for a transform stream so that I can prepend something like '[child process]' to the beginning of each line from the child, so I know that the stdio is coming from the child versus the parent process.

So it would look like:

const getTransformPrepender = function() : Transform {
   return ...
}

n.stdout.pipe(getTransformPrepender('[child]')).pipe(process.stdout);
n.stderr.pipe(getTransformPrepender('[child]')).pipe(process.stderr);

does anyone know if there is an existing transform package like this or how to write one?

I have this:

import * as stream from 'stream';


export default function(pre: string){

  let saved = '';

  return new stream.Transform({

    transform(chunk, encoding, cb) {

      cb(null, String(pre) + String(chunk));
    },

    flush(cb) {
      this.push(saved);
      cb();
    }

  });

}

but I am afraid it won't work in edge cases - where one chunk burst may not comprise an entire line (for very long lines).

It looks like the answer to this is here: https://strongloop.com/strongblog/practical-examples-of-the-new-node-js-streams-api/

but with this addendum: https://twitter.com/the1mills/status/886340747275812865

Upvotes: 7

Views: 3154

Answers (3)

dsschneidermann
dsschneidermann

Reputation: 2190

Pretty unsatisfied with the other two answers as they keep internal buffers while waiting for the line to finish, then adding the prefix. When we have a stream of characters and some pretty simple rules for when to insert the prefix, we might aswell make it unbuffered.

This is a variant that doesn't need a call to flush(), which may be useful if you are overriding stdout/stderr.write for subprocesses.

const prefixStreamUnbuffered = (prefix: string) => {
    const prefixBuf = Buffer.from(prefix)
    let endedAtLinebreak = true // Initial write adds the prefix.

    return {
        transform(chunk: Buffer, _encoding?: BufferEncoding) {
            if (chunk.length === 0) return chunk

            const EOL = Buffer.from("\n")
            const startedAfterLinebreak = endedAtLinebreak

            const lines: Buffer[] = []
            let rest = chunk
            let index: number
            while ((index = rest.indexOf("\n")) !== -1) {
                lines.push(rest.slice(0, index++))
                rest = rest.slice(index)
            }
            endedAtLinebreak = rest.length === 0
            if (!endedAtLinebreak) lines.push(rest)

            const output = startedAfterLinebreak ? [prefixBuf] : []
            lines.forEach((x, idx) => {
                const lineParts = idx ? [EOL, prefixBuf, x] : [x]
                output.push(Buffer.concat(lineParts))
            })
            if (endedAtLinebreak) output.push(EOL)
            return Buffer.concat(output)
        },
    }
}

// Tests:
const testTransform1 = prefixStreamUnbuffered("[prefix] ")
console.log(
    "testTransform1",
    testTransform1.transform(Buffer.from("hello1\nhello2\n")).toString()
)
const testTransform2 = prefixStreamUnbuffered("[prefix] ")
console.log(
    "testTransform2",
    Buffer.concat([
        testTransform2.transform(Buffer.from("hello1\n")),
        testTransform2.transform(Buffer.from("hello2\n")),
        testTransform2.transform(Buffer.from("")),
        // Empty strings are 'probably' not valid inputs to a stream, so we shouldn't
        // need to handle the `chunk.length === 0` case, but we might aswell do it.
    ]).toString()
)
const testTransform3 = prefixStreamUnbuffered("[prefix] ")
console.log(
    "testTransform3",
    Buffer.concat([
        testTransform3.transform(Buffer.from("hello1")),
        testTransform3.transform(Buffer.from("\nhello2")),
        testTransform3.transform(Buffer.from("\n")),
    ]).toString()
)

Outputs:

testTransform1 [prefix] hello1
[prefix] hello2

testTransform2 [prefix] hello1
[prefix] hello2

testTransform3 [prefix] hello1
[prefix] hello2

Upvotes: 1

Alexander Mills
Alexander Mills

Reputation: 100190

You can prepend to a stream using:

https://github.com/ORESoftware/prepend-transform

but it's designed to solve the problem at hand like so:

import pt from 'prepend-transform';
import * as cp from 'child_process';

const n = cp.spawn('bash');

n.stdout.pipe(pt('child stdout: ')).pipe(process.stdout);
n.stderr.pipe(pt('child stderr: ')).pipe(process.stderr);

Upvotes: 7

Robert Rossmann
Robert Rossmann

Reputation: 12129

There are in total three cases that you need to correctly handle:

  • A single chunk representing an entire line
  • A single chunk representing multiple lines
  • A single chunk representing only part of the line

Here is an algorithm description to solve all three situations

  1. Receive a chunk of data
  2. Scan the chunk for newlines
  3. As soon as a newline is found, take everything before it (including the newline) and send it out as a single line entry with any modifications you need
  4. Repeat until the whole chunk has been processed (no remaining data) or until no additional newlines have been found (some data remains, save it for later)

And here is an actual implementation with descriptions of why it is needed etc.

Please note that for performance reasons, I am not converting the Buffers into classic JS strings.

const { Transform } = require('stream')

const prefix = Buffer.from('[worker]: ')

const prepender = new Transform({
  transform(chunk, encoding, done) {
    this._rest = this._rest && this._rest.length
      ? Buffer.concat([this._rest, chunk])
      : chunk

    let index

    // As long as we keep finding newlines, keep making slices of the buffer and push them to the
    // readable side of the transform stream
    while ((index = this._rest.indexOf('\n')) !== -1) {
      // The `end` parameter is non-inclusive, so increase it to include the newline we found
      const line = this._rest.slice(0, ++index)
      // `start` is inclusive, but we are already one char ahead of the newline -> all good
      this._rest = this._rest.slice(index)
      // We have a single line here! Prepend the string we want
      this.push(Buffer.concat([prefix, line]))
    }

    return void done()
  },

  // Called before the end of the input so we can handle any remaining 
  // data that we have saved
  flush(done) {
    // If we have any remaining data in the cache, send it out
    if (this._rest && this._rest.length) {
      return void done(null, Buffer.concat([prefix, this._rest])
    }
  },
})

process.stdin.pipe(prepender).pipe(process.stdout)

Upvotes: 10

Related Questions