Reputation: 2688
I'm playing with streams and async/await functionality. What I have so far is:
let logRecord = ((record, callback) => {
console.log(record);
return callback();
});
let importCSVfromPath = async((csv_path) => {
return new Promise(function(resolve, reject) {
var parser = parse();
var input = fs.createReadStream(csv_path);
var transformer = transform(logRecord, {parallel: 1});
input.on('error', (err) => {
reject(err);
});
input.on('finish', ()=> {
resolve();
});
input.pipe(parser).pipe(transformer);
});
});
Now I want to replace logRecord with importRecord. The problem is that this function has to use functions that are already part of the async stack.
let importRecord = async( (record) => {
.......
await(insertRow(row));
});
What's the right way to do this?
Upvotes: 2
Views: 594
Reputation: 2658
It's slightly more complicated than this - node.js streams are not adapted (at least not yet) to the es7 async
/await
methods.
If you'd like to develop this on your own, consider writing a class derived from Readable stream. Implementing a promise based interface is quite a task, but it is possible.
If you're however fine with using a permissive licensed framework - take a look at Scramjet. With it your code will look like this (most of the example is parsing the CSV - I'll add a helper in the next version):
fs.createReadStream("file.csv") // open your file
.pipe(new StringStream()) // pass to scramjet
.split("\n") // split by line
.parse((line) => line.split(",")) // convert lines to arrays
.map(async (line) => { // run asynchrounous mapping
await importRecord(line); // import log to DB
return logRecord(line); // return some log for the output
})
.pipe(process.stdout); // pipe the output wherever you like
I believe it's exactly what you're looking for and it will run your record imports in parallel, while keeping the output order.
Upvotes: 1