user732456
user732456

Reputation: 2688

await for function with callback

I'm playing with streams and async/await functionality. What I have so far is:

let logRecord = ((record, callback) => {
     console.log(record);
     return callback();
});

let importCSVfromPath = async((csv_path) => {
    return new Promise(function(resolve, reject) {
        var parser = parse();
        var input = fs.createReadStream(csv_path);
        var transformer = transform(logRecord, {parallel: 1});

        input.on('error', (err) => {
            reject(err);
        });
        input.on('finish', ()=> {
            resolve();
        });

        input.pipe(parser).pipe(transformer);
    });
});

Now I want to replace logRecord with importRecord. The problem is that this function has to use functions that are already part of the async stack.

let importRecord = async( (record) => {
    .......
    await(insertRow(row));
});

What's the right way to do this?

Upvotes: 2

Views: 594

Answers (1)

Michał Karpacki
Michał Karpacki

Reputation: 2658

It's slightly more complicated than this - node.js streams are not adapted (at least not yet) to the es7 async/await methods.

If you'd like to develop this on your own, consider writing a class derived from Readable stream. Implementing a promise based interface is quite a task, but it is possible.

If you're however fine with using a permissive licensed framework - take a look at Scramjet. With it your code will look like this (most of the example is parsing the CSV - I'll add a helper in the next version):

fs.createReadStream("file.csv")        // open your file
    .pipe(new StringStream())          // pass to scramjet
    .split("\n")                       // split by line
    .parse((line) => line.split(","))  // convert lines to arrays
    .map(async (line) => {             // run asynchrounous mapping
        await importRecord(line);      // import log to DB
        return logRecord(line);        // return some log for the output
    })
    .pipe(process.stdout);             // pipe the output wherever you like

I believe it's exactly what you're looking for and it will run your record imports in parallel, while keeping the output order.

Upvotes: 1

Related Questions