Reputation: 36351
How can I take a writable stream and return a readable stream from a buffer?
I have the following to write the data that comes from an ftp server to an array of chunks:
let chunks = []
let writable = new Writable
writable._write = (chunk, encoding, callback) => {
chunks.push(chunk)
callback()
}
I am then creating a new readstream:
let readable = new ReadStream()
I then tried to pipe the writable to the readable but that doesn't seem to work:
Argument of type 'ReadStream' is not assignable to parameter of type 'WritableStream'.
writable.pipe(readable)
Here is the entire method:
export class FTP {
readStream(filePath, options = {}) {
let conn = this.getConnection(this.name)
if (!conn) return Buffer.from('')
filePath = this.forceRoot(filePath)
let chunks = []
let writable = new Writable
writable._write = (chunk, encoding, callback) => {
chunks.push(chunk)
callback()
}
let readable = new ReadStream()
conn.client.download(writable, filePath, options.start || undefined)
writable.pipe(readable)
return readable
}
}
I then read from the stream and pipe the output to the response object created from http.createServer()
like this:
let stream = store.readStream(file, { start, end })
.on('open', () => stream.pipe(res))
.on('close', () => res.end())
.on('error', err => res.end(err))
Upvotes: 4
Views: 6907
Reputation: 6709
Yep, Node.js streams are hard to grasp. Logically, you don't need two streams here. If you want to read from your FTP class as from a stream, you just need to implement a single readable stream. Check this section of the docs out to have an idea how to implement a readable stream from scratch:
class SourceWrapper extends Readable {
constructor(options) {
super(options);
this._source = getLowLevelSourceObject();
// Every time there's data, push it into the internal buffer.
this._source.ondata = (chunk) => {
// If push() returns false, then stop reading from source.
if (!this.push(chunk))
this._source.readStop();
};
// When the source ends, push the EOF-signaling `null` chunk.
this._source.onend = () => {
this.push(null);
};
}
// _read() will be called when the stream wants to pull more data in.
// The advisory size argument is ignored in this case.
_read(size) {
this._source.readStart();
}
}
However, from your example, I can conclude, that conn.client.download()
expects a writable stream as an input parameter. In such case you just can take a standard PassThrough stream which is a duplex (i.e. writable on the left and readable on the right side) stream with no transformation applied:
const { PassThrough } = require('stream');
export class FTP {
readStream(filePath, options = {}) {
let conn = this.getConnection(this.name);
if (!conn) return Buffer.from('');
filePath = this.forceRoot(filePath);
const pt = new PassThrough();
conn.client.download(pt, filePath, options.start);
return pt;
}
}
You can find more information on Node.js streams here and here.
UPD: Usage example:
// assume res is an [express or similar] response object.
const s = store.readStream(file, { start, end });
s.pipe(res);
Upvotes: 4
Reputation: 1332
Pipe works the other way round as you're thinking. According to Node.js's documentation, pipe()
is a method of Readable
, and it accepts a Writable
as its destination. What you were trying to do was pipe a Writable
to a Readable
, but actually it's a Readable
that can be piped to a Writeable
, not the other way round.
Try passing a PassThrough
to download()
and return that same PassThrough
?
Upvotes: 1