Reputation: 100000
I just created a simple Readable and Writable stream pair which can be connected with pipe(). I am interested in create backpressure and controlling the rate at which read occurs in the Readable. However I am a bit confused about how to actually implement this or if it's possible with Node.js streams. As an example:
const {Writable, Readable} = require('stream');
function getWritable() {
return new Writable({
write: function (chunk, encoding, cb) {
console.log(' => chunk => ', String(chunk));
setTimeout(cb, 1500);
}
});
}
function getReadable(data) {
return new Readable({
encoding: 'utf8',
objectMode: false,
read: function (n) {
// n => Number(16384)
console.log('read is called');
const d = data.shift();
this.push(d ? String(d) : null);
}
});
}
const readableStrm = getReadable([1, 2, 3, 4, 5]);
const piped = readableStrm.pipe(getWritable());
piped.on('finish', function () {
console.log('finish');
});
if you run the above code, we will see that 'read is called' will get logged 5 times, long before the write method in the Writable sees the data.
What I would like to do is to only invoke read()
in the Readable when the write method in the Writable has fired its callback; of course the read() method would have to fire once first, but subsequently would wait for the writable to be ready.
Is there a way to control when the read()
method fires in the readable somehow?
Ultimately, I really do not understand what the purpose of the read()
method is.
As a simple example, no matter what I return from read(), I cannot get it to stop reading. What is the point of the read method, and why must we implement it?
const Readable = require('stream').Readable;
const r = new Readable({
objectMode: true,
read: function (n) {
console.log('is read');
return false/null/true; // nothing I return here makes a difference
}
});
r.on('data', function (d) {
console.log(d);
});
setInterval(function(){
r.push('valid');
},1000);
Upvotes: 2
Views: 1913
Reputation: 4787
Node.js streams are very powerful and provides a lot of control on buffering in buffering and flow of the data through them.
Now to answer your questions:
A) You see all the data being read first and then the write being fired because your data stream is very small. If you did the test with Kilobytes of data, you would see read stream and write stream firing in sequence. The sequence depends on buffering capacity of the read stream and back-pressure created by the write stream. For example, TCP socket read stream will be much faster that disk file write stream, creating a backpressure.
B) One powerful constructor options for read & write stream is highWaterMark
. You can read about it more in the Buffering section of the documentation. highWaterMark
specifically defines the buffering capacity of the readable/writeable streams. Default value is 16kb. In your above example you can setup your readable stream with highWaterMark as 2 bytes as shown below and you will see the difference (never required in practical situation but you can use for learning).
function getReadable(data) {
let i = 0;
return new stream.Readable({
highWaterWark: 2, // <--- highWaterMark set to 2 byte. Preferably set it to 10 and increase the length of your input array.
encoding: 'utf8',
objectMode: false,
read: function (n) {
// n => Number(16384)
console.log('read is called');
const d = data[i++];
this.push(d ? String(d) : null);
}
});
}
Smaller value of highWaterMark
will create a backpressure very soon and may be bad for certain use cases like reading network data.
C) You can also control the flow of data in read stream and write stream. If your aplication requires it, then you can control the readable stream flow from the writeable stream. Specific methods readable.pause()
, readable.read([size])
, readable.resume()
, readable.push(chunk[, encoding])
and readable.unpipe([destination])
allow you to control the buffering & flow of data in readable stream (even from your writable stream). In fact, you can even push data back from writable stream to readable stream using method readable.unshift(chunk)
. There are similar methods for controlling data in writable stream.
D) The read
and write
methods are part of your implementation of the stream. These methods are used to send streaming data to the underlying resource and should not be called from your application data directly. Basically, it defines the setup of your stream. (Not sure if I am able to explain this clearly).
I strongly suggest that you read the Node.js documentation on streams. It will give you a lot of information (more than what you will get from sample codes that you will find on various other sites).
I hope the above information helps you.
Upvotes: 2