John Grayson
John Grayson

Reputation: 1496

How to asynchronously createReadStream in node.js with async/await

I am having difficulty with using fs.creadReadStream to process my csv file asynchronously:

async function processData(row) {
    // perform some asynchronous function
    await someAsynchronousFunction();
}

fs.createReadStream('/file')
    .pipe(parse({
        delimiter: ',',
        columns: true
    })).on('data', async (row) => {
        await processData(row);
    }).on('end', () => {
        console.log('done processing!')
    })

I want to perform some asynchronous function after reading each record one by one before the createReadStream reaches on('end').

However, the on('end') gets hit before all of my data finishes processing. Does anyone know what I might be doing wrong?

Thanks in advance!

Upvotes: 6

Views: 10225

Answers (2)

jfriend00
jfriend00

Reputation: 707158

.on('data, ...) does not wait for your await. Remember, an async function returns a promise immediately and .on() is not paying any attention to that promise so it just keeps merrily going on.

The await only waits inside the function, it does not stop your function from returning immediately and thus the stream thinks you've processed the data and keeps sending more data and generating more data events.

There are several possible approaches here, but the simplest might be to pause the stream until processData() is done and then restart the stream.

Also, does processData() return a promise that is linked to the completion of the async operation? That is also required for await to be able to do its job.

The readable stream doc contains an example of pausing the stream during a data event and then resuming it after some asynchronous operation finishes. Here's their example:

const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
  console.log(`Received ${chunk.length} bytes of data.`);
  readable.pause();
  console.log('There will be no additional data for 1 second.');
  setTimeout(() => {
    console.log('Now data will start flowing again.');
    readable.resume();
  }, 1000);
});

Upvotes: 15

TFischer
TFischer

Reputation: 1348

I ran into the same problem recently. I fixed it by using an array of promises, and waiting for all of them to resolve when .on("end") was triggered.

import parse from "csv-parse";

export const parseCsv = () =>
  new Promise((resolve, reject) => {
    const promises = [];
    fs.createReadStream('/file')
      .pipe(parse({ delimiter: ',', columns: true }))
      .on("data", row => promises.push(processData(row)))
      .on("error", reject)
      .on("end", async () => {
        await Promise.all(promises);
        resolve();
      });
  });

Upvotes: 12

Related Questions