naivedeveloper
naivedeveloper

Reputation: 2958

How to force Node.js Transform stream to finish?

Consider the following scenario. I have two Node Transform streams:

Transform stream 1

function T1(options) {
  if (! (this instanceof T1)) {
    return new T1(options);
  }

  Transform.call(this, options);
}
util.inherits(T1, Transform);

T1.prototype._transform = function(chunk, encoding, done) {
  console.log("### Transforming in t1");
  this.push(chunk);
  done();
};

T1.prototype._flush = function(done) {
  console.log("### Done in t1");
  done();
};

Transform stream 2

function T2(options) {
  if (! (this instanceof T2)) {
    return new T2(options);
  }

  Transform.call(this, options);
}
util.inherits(T2, Transform);

T2.prototype._transform = function(chunk, encoding, done) {
  console.log("### Transforming in t2");
  this.push(chunk);
  done();
};

T2.prototype._flush = function(done) {
  console.log("### Done in t2");
  done();
};

And, I'm wanting to apply these transform streams before returning a response. I have a simple HTTP server, and on each request, I fetch a resource and would like these transformations to be applied to this fetched resource and then send the result of the second transformation to the original response:

var options = require('url').parse('http://localhost:1234/data.json');
options.method = 'GET';

http.createServer(function(req, res) {
  var req = http.request(options, function(httpRes) {
    var t1 = new T1({});
    var t2 = new T2({});

    httpRes
      .pipe(t1)
      .pipe(t2)
      .on('finish', function() {
        // Do other stuff in here before sending request back
        t2.pipe(res, { end : true });
      });
  });

  req.end();
}).listen(3001);

Ultimately, the finish event never gets called, and the request hangs and times out because the response is never resolved. I've noticed that if I just pipe t2 into res, it seems to work fine:

  .pipe(t1)
  .pipe(t2)
  .pipe(res, { end : true });

But, this scenario doesn't seem feasible because I need to do some extra work before returning the response.

Upvotes: 4

Views: 3561

Answers (1)

Michał Karpacki
Michał Karpacki

Reputation: 2658

This happens because you need to let node know that the stream is being consumed somewhere, otherwise the last stream will just fill up the buffer and considering your data is longer than the highwaterMark option (usually 16) and then halt waiting for the data to be consumed.

There are three ways of consuming a stream in full:

  • piping to a readable stream (what you did in the second part of your question)
  • reading consecutive chunks by calling the read method of a stream
  • listening on "data" events (essentially stream.on("data", someFunc)).

The last option is the quickest, but will result in consuming the stream without looking at memory usage.

I'd also note that using the "finish" event might be a little misleading, since it is called when the last data is read, but not necessarily emitted. On a Transform stream, since it's a readable as well it's much better to use the "end" event.

Upvotes: 3

Related Questions