Reputation: 28913
I currently have this working code (slightly simplified here); it is the standard idiom for wrapping readline in a promise.
function process(fname, options){
return new Promise(function (resolve, reject) {
const stats = {cnt:0,error:0,mse:0.0}
const reader = require('readline').createInterface({
input: fs.createReadStream(fname),
})
reader.on('error', reject) // Reject the promise, when an error
reader.on('line', function (row) {
stats.error += doSync(row)
stats.cnt++
})
reader.on('close', function () {
if (stats.cnt >= 1)stats.mse = stats.error / stats.cnt
resolve(stats)
})
})
}
I want to change the call to doSync()
to doAsync()
(which will be returning a promise).
I tried changing this:
reader.on('line', function (row) {
stats.error += doSync(row)
stats.cnt++
})
to:
reader.on('line', async function (row) {
stats.error += await doAsync(row)
stats.cnt++
})
But it didn't work. Specifically, it processes doAsync()
up to the actual thing that is async (a call to a shell command), then immediately goes on to the next line. For all lines in the file. Then the script sits there and we glare at each other.
My hunch is that readline is ignoring the returned promise, and that there is nothing I can do. But I'm hoping the hive mind has some ideas.
I'm on node 8.12.0, but upgrading to 10.x is not out of the question. And I'm not tied to using readline. (But I am tied to processing the input file line by line!)
UPDATE:
NOTE: my doAsync()
turned out to have a bug. But even once fixed, readline still did not work.
Switching from readline to line-by-line fixed it. (It is almost a drop-in replacement; but change the 'close' even to be the 'end' event.) The accepted answer was more code, but also worked equally well.
On a comparison test, the Transform
approach took 1m 48s to 1m49s, the line-by-line approach took 1m 49s to 1m 51s. (NOTE: just two runs each, but that was enough to convince me they were basically identical.)
Using reader.pause()
/resume()
did not help with readline, and was not needed with line-by-line (using it did enforce a strict one at a time processing, but it still worked fine without that).
Upvotes: 2
Views: 789
Reputation: 6063
Using Transform
streams is probably the best way to handle each line asynchronously.
const {Transform} = require('stream');
You can replace the readline
library with a simple transform:
function toLines() {
let line = '';
return new Transform({
decodeStrings: false,
readableObjectMode: true,
transform(chunk, encoding, callback) {
const lines = chunk.split('\n');
line += lines.shift();
while (lines.length) {
this.push(line);
line = lines.shift();
}
callback();
},
flush(callback) {
if (line) {
this.push(line);
}
callback();
}
});
}
And implement another Transform
that gathers the "stats":
function toStats() {
const stats = {cnt: 0, error: 0, mse: 0.0};
return new Transform({
objectMode: true,
async transform(line, encoding, callback) {
stats.error += await doAsync(line);
stats.cnt++;
callback();
},
flush(callback) {
if (stats.cnt >= 1)stats.mse = stats.error / stats.cnt;
callback(null, stats);
}
});
}
Then you can implement process
to utilize the transforms:
async function process(fname, options) {
return new Promise((resolve, reject) => {
fs.createReadStream(fname, {encoding: 'utf8'})
.pipe(toLines())
.pipe(toStats())
.on('error', reject)
.on('data', resolve);
});
}
Upvotes: 2