B T
B T

Reputation: 60875

How do you implement a stream that properly handles backpressure in node.js?

I can't for the life of me figure out how to implement a stream that properly handles backpressure. Should you never use pause and resume?

I have this implementation I'm trying to get to work correctly:

var StreamPeeker = exports.StreamPeeker = function(myStream, callback) {
    stream.Readable.call(this, {highWaterMark: highWaterMark})
    this.stream = myStream

    myStream.on('readable', function() {
        var data = myStream.read(5000)
        //process.stdout.write("Eff: "+data)
        if(data !== null) {
            if(!this.push(data)) {
                process.stdout.write("Pause")
                this.pause()
            }
            callback(data)
        }
    }.bind(this))

    myStream.on('end', function() {
        this.push(null)
    }.bind(this))
}
util.inherits(StreamPeeker, stream.Readable)
StreamPeeker.prototype._read = function() {
    process.stdout.write("resume")
    //this.resume() // putting this in for some reason causes the stream to not output???
}

It correctly sends output, but doesn't correctly produce backpressure. How can I change it to properly support backpressure?

Upvotes: 3

Views: 501

Answers (1)

B T
B T

Reputation: 60875

Ok I finally figured it out after lots of trial and error. A couple guidelines:

  • Never ever use pause or resume (otherwise it'll go into legacy "flowing" mode)
  • Never add a "data" event listener (otherwise it'll go into legacy "flowing" mode)
  • Its the implementor's responsibility to keep track of when the source is readable
  • Its the implementor's responsibility to keep track of when the destination wants more data
  • The implementation should not read any data until the _read method is called
  • The argument to read tells the source to give it that many bytes, it probably best to pass the argument passed to this._read into the source's read method. This way you should be able to configure how much to read at a time at the destination, and the rest of the stream chain should be automatic.

So this is what I changed it to:

Update: I created a Readable that is much easier to implement with proper back-pressure, and should have just as much flexibility as node's native streams.

var Readable = stream.Readable
var util = require('util')

// an easier Readable stream interface to implement
// requires that subclasses:
    // implement a _readSource function that
        // * gets the same parameter as Readable._read (size)
        // * should return either data to write, or null if the source doesn't have more data yet
    // call 'sourceHasData(hasData)' when the source starts or stops having data available
    // calls 'end()' when the source is out of data (forever)
var Stream666 = {}
Stream666.Readable = function() {
    stream.Readable.apply(this, arguments)
    if(this._readSource === undefined) {
        throw new Error("You must define a _readSource function for an object implementing Stream666")
    }

    this._sourceHasData = false
    this._destinationWantsData = false
    this._size = undefined // can be set by _read
}
util.inherits(Stream666.Readable, stream.Readable)
Stream666.Readable.prototype._read = function(size) {
    this._destinationWantsData = true
    if(this._sourceHasData) {
        pushSourceData(this, size)
    } else {
        this._size = size
    }
}
Stream666.Readable.prototype.sourceHasData = function(_sourceHasData) {
    this._sourceHasData = _sourceHasData
    if(_sourceHasData && this._destinationWantsData) {
        pushSourceData(this, this._size)
    }
}
Stream666.Readable.prototype.end = function() {
    this.push(null)
}
function pushSourceData(stream666Readable, size) {
    var data = stream666Readable._readSource(size)
    if(data !== null) {
        if(!stream666Readable.push(data)) {
            stream666Readable._destinationWantsData = false
        }
    } else {
        stream666Readable._sourceHasData = false
    }
}    

// creates a stream that can view all the data in a stream and passes the data through
// correctly supports backpressure
// parameters:
    // stream - the stream to peek at
    // callback - called when there's data sent from the passed stream
var StreamPeeker = function(myStream, callback) {
    Stream666.Readable.call(this)
    this.stream = myStream
    this.callback = callback

    myStream.on('readable', function() {
        this.sourceHasData(true)
    }.bind(this))
    myStream.on('end', function() {
        this.end()
    }.bind(this))
}
util.inherits(StreamPeeker, Stream666.Readable)
StreamPeeker.prototype._readSource = function(size) {
    var data = this.stream.read(size)
    if(data !== null) {
        this.callback(data)
        return data
    } else {
        this.sourceHasData(false)
        return null
    }
}

Old Answer:

// creates a stream that can view all the data in a stream and passes the data through
// correctly supports backpressure
// parameters:
    // stream - the stream to peek at
    // callback - called when there's data sent from the passed stream
var StreamPeeker = exports.StreamPeeker = function(myStream, callback) {
    stream.Readable.call(this)
    this.stream = myStream
    this.callback = callback
    this.reading = false
    this.sourceIsReadable = false

    myStream.on('readable', function() {
        this.sourceIsReadable = true
        this._readMoreData()
    }.bind(this))

    myStream.on('end', function() {
        this.push(null)
    }.bind(this))
}
util.inherits(StreamPeeker, stream.Readable)
StreamPeeker.prototype._read = function() {
    this.reading = true
    if(this.sourceIsReadable) {
        this._readMoreData()
    }
}
StreamPeeker.prototype._readMoreData = function() {
    if(!this.reading) return;

    var data = this.stream.read()
    if(data !== null) {
        if(!this.push(data)) {
            this.reading = false
        }
        this.callback(data)
    }
}

Upvotes: 5

Related Questions