Matt Harrison
Matt Harrison

Reputation: 13587

Putting data back onto a Readable stream

TL;DR How can I read some data from a stream and then put it back allowing other consumers to get the same data event?

Here's a readable stream that streams 1...Infinity:

var Readable = require('stream').Readable;

var readable = new Readable();

var c = 0;

readable._read = function () {

    var self = this;

    setTimeout(function () {
        self.push((++c).toString());
    }, 500);
};

I want to read the first data event, look at the data and then "reset" the stream to its original state and allow other another data listener to consume the first event as if it never happened. I thought unshift() would be the correct method as it says in the docs:

readable.unshift(chunk)#

chunk Buffer | String Chunk of data to unshift onto the read queue This is useful in certain cases where a stream is being consumed by a parser, which needs to "un-consume" some data that it has optimistically pulled out of the source, so that the stream can be passed on to some other party.

That sounds perfect for my needs but it doesn't work how I expect:

...

readable.once('data', function (d) {

    console.log(d.toString());              // Outputs 1

    readable.unshift(d);                    // Put the 1 back on the stream

    readable.on('data', function (d) {
        console.log(d.toString());          // Heh?! Outputs 2, how about 1?
    });

});

Upvotes: 8

Views: 3519

Answers (1)

Matt Harrison
Matt Harrison

Reputation: 13587

So I figured out the answer:

When you call stream.unshift() if will emit the data event immediately if the stream is in flowing mode. So by the time I add the listener in my example, the ship has already sailed.

readable.unshift(d);                  // emits 'data' event

readable.on('data', function (d) {    // missed `data` event
    console.log(d.toString());
});

There's a couple of ways to make it work how I expected:

1) Add the new listener before unshifting:

readable.once('data', function (d) {

    console.log(d.toString());              // Outputs 1

    readable.on('data', function (d) {
        console.log(d.toString());          // Outputs 1,1,2,3...
    });

    readable.unshift(d);                    // Put the 1 back on the stream

});

2) Pause and resume the stream:

readable.once('data', function (d) {

    console.log(d.toString());              // Outputs 1
    readable.pause();                       // Stops the stream from flowing
    readable.unshift(d);                    // Put the 1 back on the stream

    readable.on('data', function (d) {
        console.log(d.toString());          // Outputs 1,1,2,3...
    });

    readable.resume();                      // Start the stream flowing again

});

Upvotes: 15

Related Questions