Reputation: 1496
I am having difficulty with using fs.creadReadStream
to process my csv file asynchronously:
async function processData(row) {
// perform some asynchronous function
await someAsynchronousFunction();
}
fs.createReadStream('/file')
.pipe(parse({
delimiter: ',',
columns: true
})).on('data', async (row) => {
await processData(row);
}).on('end', () => {
console.log('done processing!')
})
I want to perform some asynchronous function after reading each record one by one before the createReadStream
reaches on('end')
.
However, the on('end')
gets hit before all of my data finishes processing. Does anyone know what I might be doing wrong?
Thanks in advance!
Upvotes: 6
Views: 10225
Reputation: 707158
.on('data, ...)
does not wait for your await
. Remember, an async
function returns a promise immediately and .on()
is not paying any attention to that promise so it just keeps merrily going on.
The await
only waits inside the function, it does not stop your function from returning immediately and thus the stream thinks you've processed the data and keeps sending more data and generating more data
events.
There are several possible approaches here, but the simplest might be to pause the stream until processData()
is done and then restart the stream.
Also, does processData()
return a promise that is linked to the completion of the async operation? That is also required for await
to be able to do its job.
The readable stream doc contains an example of pausing the stream during a data
event and then resuming it after some asynchronous operation finishes. Here's their example:
const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
console.log(`Received ${chunk.length} bytes of data.`);
readable.pause();
console.log('There will be no additional data for 1 second.');
setTimeout(() => {
console.log('Now data will start flowing again.');
readable.resume();
}, 1000);
});
Upvotes: 15
Reputation: 1348
I ran into the same problem recently. I fixed it by using an array of promises, and waiting for all of them to resolve when .on("end")
was triggered.
import parse from "csv-parse";
export const parseCsv = () =>
new Promise((resolve, reject) => {
const promises = [];
fs.createReadStream('/file')
.pipe(parse({ delimiter: ',', columns: true }))
.on("data", row => promises.push(processData(row)))
.on("error", reject)
.on("end", async () => {
await Promise.all(promises);
resolve();
});
});
Upvotes: 12