Franck Freiburger
Franck Freiburger

Reputation: 28448

node.js: create a connected writable and readable stream pair

I am trying to create a function that returns a connected writable and readable stream pair. eg:

const { writable, readable } = createStreamPair();

where each end has the right interface (writable instanceof stream.Readable === false and readable instanceof stream.Writable === false) unlike the PassThrough stream.

use case:

createWriteStream(filePath) {

    const { writable, readable } = createStreamPair();
    writeFile(filePath, readable);
    return writable;
}

How to create my createStreamPair() function ?

Edit1

A naive approach that obviously does not work ...

function createStreamPair() {

    var readable = new stream.Readable();
    var writable = new stream.Writable();
    readable.pipe(writable);
    return { writable, readable }
}

Upvotes: 2

Views: 838

Answers (3)

JamesTheAwesomeDude
JamesTheAwesomeDude

Reputation: 1053

No need to overcomplicate things.

let { writable, readable } = new TransformStream();

  1. This is fundamentally the same recipe that Node's stream.PassThrough class comprises.
  2. It looks like this explicitly optimized both in Chrome and in Firefox.
  3. The Streams spec document explicitly prescribes this pattern:

An identity transform stream is a type of transform stream which forwards all chunks written to its writable side to its readable side, without any changes. This can be useful in a variety of scenarios*. By default, the TransformStream constructor will create an identity transform stream, when no transform() method is present on the transformer object.

*One use of identity transform streams is to easily convert between readable and writable streams. For example, the fetch() API accepts a readable stream request body, but it can be more convenient to write data for uploading via a writable stream interface. Using an identity transform stream addresses this:

const { writable, readable } = new TransformStream();
fetch("...", { body: readable }).then(response => /* ... */);

const writer = writable.getWriter();
writer.write(new Uint8Array([0x73, 0x74, 0x72, 0x65, 0x61, 0x6D, 0x73, 0x21]));
writer.close();

Upvotes: 0

gobien
gobien

Reputation: 537

As of today you can use stream.PassTrough

Upvotes: -1

awwright
awwright

Reputation: 645

The Node.js tests uses a function that creates two Duplex streams, writes to one can be read from the other, and vice-versa: https://github.com/nodejs/node/blob/master/test/common/duplexpair.js

It isn't part of the Node.js standard library, but you can write your own.

I'll present a slightly modified, annotated version here:

const Duplex = require('stream').Duplex;
const assert = require('assert');

// Define some unique property names.
// The actual value doesn't matter,
// so long as they're not used by Node.js for anything else.
const kCallback = Symbol('Callback');
const kOtherSide = Symbol('Other');

// Define a function `DuplexSocket` whose prototype inherits from `Duplex`
class DuplexSocket extends Duplex {
    constructor() {
        // Let Node.js initialize everything it needs to
        super();
        // Define two values we will be using
        // kCallback saves a temporary reference to a function while
        this[kCallback] = null;
        // kOtherSide will be the reference to the other side of the stream
        this[kOtherSide] = null;
    }

    _read() {
        // This is called when this side receives a push() call
        // If the other side set a callback for us to call,
        // then first clear that reference
        // (it might be immediately set to a new value again),
        // then call the function.
        const callback = this[kCallback];
        if (callback) {
            this[kCallback] = null;
            callback();
        }
    }

    _write(chunk, encoding, callback) {
        // This is called when someone writes to the stream
        // Ensure there's a reference to the other side before trying to call it
        assert.notStrictEqual(this[kOtherSide], null);
        // Ensure that the other-side callback is empty before setting it
        // If push immediately calls _read, this should never be a problem
        assert.strictEqual(this[kOtherSide][kCallback], null);
        if (chunk.length === 0) {
            // callback is not called for zero-length chunks
            process.nextTick(callback);
        } else {
            // Let Node.js know when _read has been called
            this[kOtherSide][kCallback] = callback;
            // And finally, send the other side the data to be read
            this[kOtherSide].push(chunk);
        }
    }

    _final(callback) {
        // Ask the other side to let us know it received our EOF request
        this[kOtherSide].on('end', callback);
        // And finally, pushing null signals the end of the stream
        this[kOtherSide].push(null);
    }
}

function makeDuplexPair() {
    // Create two pairs of 
    const clientSide = new DuplexSocket();
    const serverSide = new DuplexSocket();
    // Set the other-side reference
    clientSide[kOtherSide] = serverSide;
    serverSide[kOtherSide] = clientSide;
    // Both instances behave the same, so choice of name doesn't matter,
    // So long as they're distinguishable.
    return { clientSide, serverSide };
}

module.exports = makeDuplexPair;

Here's another way of creating two streams, one Readable and one Writable in this case:

function makeAsymmetricalStreamPair() {
    var readableCallback;
    const readableSide = new ReadableStream;
    readableSide._read = function _read(){
        if(!readableCallback) return;
        var callback = readableCallback;
        readableCallback = null;
        callback();
    }
    const writableSide = new WritableStream;
    writableSide._write = function _write(chunk, enc, callback){
        if (readableCallback) throw new Error;
        if (chunk.length === 0) {
            process.nextTick(callback);
        } else {
            readableCallback = callback;
            readableSide.push(chunk);
        }
    }
    writableSide._final = function _final(callback){
        readableSide.on('end', callback);
        readableSide.push(null);
    }
    return { readableSide, writableSide };
}

Upvotes: 2

Related Questions