Darren Cook
Darren Cook

Reputation: 28913

Make async call from readline's line callback?

I currently have this working code (slightly simplified here); it is the standard idiom for wrapping readline in a promise.

function process(fname, options){
  return new Promise(function (resolve, reject) {
    const stats = {cnt:0,error:0,mse:0.0}
    const reader = require('readline').createInterface({
        input: fs.createReadStream(fname),
        })
    reader.on('error', reject) // Reject the promise, when an error
    reader.on('line', function (row) {
        stats.error += doSync(row)
        stats.cnt++
        })
    reader.on('close', function () {
      if (stats.cnt >= 1)stats.mse = stats.error / stats.cnt
      resolve(stats)
    })
  })
}

I want to change the call to doSync() to doAsync() (which will be returning a promise).

I tried changing this:

reader.on('line', function (row) {
    stats.error += doSync(row)
    stats.cnt++
    })

to:

reader.on('line', async function (row) {
    stats.error += await doAsync(row)
    stats.cnt++
    })

But it didn't work. Specifically, it processes doAsync() up to the actual thing that is async (a call to a shell command), then immediately goes on to the next line. For all lines in the file. Then the script sits there and we glare at each other.

My hunch is that readline is ignoring the returned promise, and that there is nothing I can do. But I'm hoping the hive mind has some ideas.

I'm on node 8.12.0, but upgrading to 10.x is not out of the question. And I'm not tied to using readline. (But I am tied to processing the input file line by line!)

UPDATE:

NOTE: my doAsync() turned out to have a bug. But even once fixed, readline still did not work.

Switching from readline to line-by-line fixed it. (It is almost a drop-in replacement; but change the 'close' even to be the 'end' event.) The accepted answer was more code, but also worked equally well.

On a comparison test, the Transform approach took 1m 48s to 1m49s, the line-by-line approach took 1m 49s to 1m 51s. (NOTE: just two runs each, but that was enough to convince me they were basically identical.)

Using reader.pause()/resume() did not help with readline, and was not needed with line-by-line (using it did enforce a strict one at a time processing, but it still worked fine without that).

Upvotes: 2

Views: 789

Answers (1)

Jake Holzinger
Jake Holzinger

Reputation: 6063

Using Transform streams is probably the best way to handle each line asynchronously.

const {Transform} = require('stream');

You can replace the readline library with a simple transform:

function toLines() {
    let line = '';
    return new Transform({
        decodeStrings: false,
        readableObjectMode: true,
        transform(chunk, encoding, callback) {
            const lines = chunk.split('\n');

            line += lines.shift();
            while (lines.length) {
                this.push(line);
                line = lines.shift();
            }

            callback();
        },
        flush(callback) {
            if (line) {
                this.push(line);
            }
            callback();
        }
    });
}

And implement another Transform that gathers the "stats":

function toStats() {
    const stats = {cnt: 0, error: 0, mse: 0.0};
    return new Transform({
        objectMode: true,
        async transform(line, encoding, callback) {
            stats.error += await doAsync(line);
            stats.cnt++;
            callback();
        },
        flush(callback) {
            if (stats.cnt >= 1)stats.mse = stats.error / stats.cnt;
            callback(null, stats);
        }
    });
}

Then you can implement process to utilize the transforms:

async function process(fname, options) {
    return new Promise((resolve, reject) => {
        fs.createReadStream(fname, {encoding: 'utf8'})
            .pipe(toLines())
            .pipe(toStats())
            .on('error', reject)
            .on('data', resolve);
    });
}

Upvotes: 2

Related Questions