Michael D. Moffitt
Michael D. Moffitt

Reputation: 781

Pausing readline in Node.js

Consider the code below ... I am trying to pause the stream after reading the first 5 lines:

var fs          = require('fs');
var readline    = require('readline');
var stream      = require('stream');
var numlines    = 0;
var instream    = fs.createReadStream("myfile.json");
var outstream   = new stream;
var readStream = readline.createInterface(instream, outstream);
readStream.on('line', function(line){
  numlines++;
  console.log("Read " + numlines + " lines");
  if (numlines >= 5) {
    console.log("Pausing stream");
    readStream.pause();
  }
});

The output (copied next) suggests that it keeps reading lines after the pause. Perhaps readline has queued up a few more lines in the buffer, and is feeding them to me anyway ... this would make sense if it continues to read asynchronously in the background, but based on the documentation, I don't know what the proper behavior should be. Any recommendations on how to achieve the desired effect?

Read 1 lines
Read 2 lines
Read 3 lines
Read 4 lines
Read 5 lines
Pausing stream
Read 6 lines
Pausing stream
Read 7 lines

Upvotes: 14

Views: 11149

Answers (4)

Ken Lin
Ken Lin

Reputation: 1919

You can adjust the amount of internal buffering readline performs by through highwaterMark. See https://nodejs.org/api/stream.html#buffering

Upvotes: 1

Yang Young
Yang Young

Reputation: 622

add some points:

.on('pause', function() {
    console.log(numlines)
})

You will get the 5. It mentioned in the node.js document :

  • The input stream is not paused and receives the SIGCONT event. (See events SIGTSTP and SIGCONT)

So, I created a tmp buffer in the line event. Use a flag to determine whether it is triggered paused.

.on('line', function(line) {
   if (paused) {
      putLineInBulkTmp(line);
   } else {
      putLineInBulk(line);
   }
}

then in the on pause, and resume:

.on('pause', function() {
    paused = true;
    doSomething(bulk, function(resp) {
        // clean up bulk for the next.
        bulk = [];
        // clone tmp buffer.
        bulk = clone(bulktmp);
        bulktmp = [];
        lr.resume();
    });
})
.on('resume', () => {
  paused = false;
})

Use this way to handle this kind of situation.

Upvotes: 4

Alexander O'Mara
Alexander O'Mara

Reputation: 60587

Somewhat unintuitively, the pause methods does not stop queued up line events:

Calling rl.pause() does not immediately pause other events (including 'line') from being emitted by the readline.Interface instance.

There is however a 3rd-party module named line-by-line where pause does pause the line events until it is resumed.

var LineByLineReader = require('line-by-line'),
    lr = new LineByLineReader('big_file.txt');

lr.on('error', function (err) {
  // 'err' contains error object
});

lr.on('line', function (line) {
  // pause emitting of lines...
  lr.pause();

  // ...do your asynchronous line processing..
  setTimeout(function () {

      // ...and continue emitting lines.
      lr.resume();
  }, 100);
});

lr.on('end', function () {
  // All lines are read, file is closed now.
});

(I have no affiliation with the module, just found it useful for dealing with this issue.)

Upvotes: 23

Michael D. Moffitt
Michael D. Moffitt

Reputation: 781

So, it turns out that the readline stream tends to "drip" (i.e., leak a few extra lines) even after a pause(). The documentation does not make this clear, but it's true.

If you want the pause() toggle to appear immediate, you'll have to create your own line buffer and accumulate the leftover lines yourself.

Upvotes: 11

Related Questions