Get Off My Lawn
Get Off My Lawn

Reputation: 36351

How to pipe Writable Buffer to a ReadStream?

How can I take a writable stream and return a readable stream from a buffer?

I have the following to write the data that comes from an ftp server to an array of chunks:

let chunks = []
let writable = new Writable
writable._write = (chunk, encoding, callback) => {
  chunks.push(chunk)
  callback()
}

I am then creating a new readstream:

let readable = new ReadStream()

I then tried to pipe the writable to the readable but that doesn't seem to work:

Argument of type 'ReadStream' is not assignable to parameter of type 'WritableStream'.

writable.pipe(readable)

Here is the entire method:

export class FTP {
  readStream(filePath, options = {}) {
    let conn = this.getConnection(this.name)
    if (!conn) return Buffer.from('')
    filePath = this.forceRoot(filePath) 
    let chunks = []
    let writable = new Writable
    writable._write = (chunk, encoding, callback) => {
      chunks.push(chunk)
      callback()
    }
    let readable = new ReadStream()

    conn.client.download(writable, filePath, options.start || undefined)
    writable.pipe(readable)
    return readable
  }
}

I then read from the stream and pipe the output to the response object created from http.createServer() like this:

      let stream = store.readStream(file, { start, end })
        .on('open', () => stream.pipe(res))
        .on('close', () => res.end())
        .on('error', err => res.end(err))

Upvotes: 4

Views: 6907

Answers (2)

Ivan Velichko
Ivan Velichko

Reputation: 6709

Yep, Node.js streams are hard to grasp. Logically, you don't need two streams here. If you want to read from your FTP class as from a stream, you just need to implement a single readable stream. Check this section of the docs out to have an idea how to implement a readable stream from scratch:

class SourceWrapper extends Readable {
  constructor(options) {
    super(options);

    this._source = getLowLevelSourceObject();

    // Every time there's data, push it into the internal buffer.
    this._source.ondata = (chunk) => {
      // If push() returns false, then stop reading from source.
      if (!this.push(chunk))
        this._source.readStop();
    };

    // When the source ends, push the EOF-signaling `null` chunk.
    this._source.onend = () => {
      this.push(null);
    };
  }
  // _read() will be called when the stream wants to pull more data in.
  // The advisory size argument is ignored in this case.
  _read(size) {
    this._source.readStart();
  }
}

However, from your example, I can conclude, that conn.client.download() expects a writable stream as an input parameter. In such case you just can take a standard PassThrough stream which is a duplex (i.e. writable on the left and readable on the right side) stream with no transformation applied:

const { PassThrough } = require('stream');

export class FTP {
  readStream(filePath, options = {}) {
    let conn = this.getConnection(this.name);
    if (!conn) return Buffer.from('');
    filePath = this.forceRoot(filePath);
    
    const pt = new PassThrough();
    conn.client.download(pt, filePath, options.start);
    return pt;
  }
}

You can find more information on Node.js streams here and here.

UPD: Usage example:

// assume res is an [express or similar] response object.
const s = store.readStream(file, { start, end });
s.pipe(res);

Upvotes: 4

Amit Beckenstein
Amit Beckenstein

Reputation: 1332

Pipe works the other way round as you're thinking. According to Node.js's documentation, pipe() is a method of Readable, and it accepts a Writable as its destination. What you were trying to do was pipe a Writable to a Readable, but actually it's a Readable that can be piped to a Writeable, not the other way round.

Try passing a PassThrough to download() and return that same PassThrough?

Upvotes: 1

Related Questions