Reputation: 751
I'm using fs.createReadStream
to read uploaded csv file data.When data row received,I'm validate those data and push validated data in to the array.
The issue was,it's receive final row after called end
.
I've use csv with four rows for for bellow test.All four records are validation pass.
const stream = fs.createReadStream(filePath)
.pipe(csv.parse({ headers: true }))
.on("error", (error) => {
throw error.message;
})
.on("data", async (row) => {
try {
stream.pause();
const mobile = Helper.validateMobile(row.Telephone)
const validationerrors = await Helper.ValidateCustomer(null, mobile, row.code, "BULK");
console.log(`validationerrors ---> ${validationerrors} Telephone--> ${row.Telephone}`)
if (validationerrors.length) {
throw validationerrors.message
} else {
csvData.push(row);
}
} finally {
stream.resume();
}
})
.on("end", async () => {
console.log("ENDDD ---->", csvData)
});
Console.log
out put is like this.Final row (row with 777223478) receive after on("end")
calls.
validationerrors ---> Telephone--> 778786516
validationerrors ---> Telephone--> 718254596
validationerrors ---> Telephone--> 712760763
ENDDD ----> [
{
Code: 'CTLD000323',',
Telephone: '778786516' },
{
Code: 'CTLD000324',
Telephone: '718254596' },
{
Code: 'CTLD000376',
Telephone: '712760763' }
]
validationerrors ---> Telephone--> 777223478
Upvotes: 0
Views: 1346
Reputation: 707158
So, it appears that the CSV library does not respect the .pause()
method and if it has multiple rows queued up, it goes ahead and fires data
events for them, even if the stream is paused. The two ways I know of to fix that are to modify the csv module to respect the stream pause or to queue up line as they come in so you can manually pause processing of them when you are already in the middle of processing something. And, you also have to queue up the end
event since the stream will send it before you're done processing. I've handled the end event by synthesizing my own finalEnd
event. Here's a rough implementation of a queue that buffers data
events that arrive while the stream is paused.
let paused = false;
const queue = [];
let end = false;
const stream = fs.createReadStream(filePath)
.pipe(csv.parse({ headers: true }))
.on("error", (error) => {
throw error.message;
})
.on("data", async (row) => {
queue.push(row);
if (!paused) {
stream.pause();
paused = true;
while (queue.length) {
try {
await processRow(queue.shift());
} catch (e) {
// decide what to do here if you get an error processing a row
console.log(e);
}
}
paused = false;
stream.resume();
if (end) {
stream.emit("finalEnd");
}
}
async function processRow(row) {
const mobile = Helper.validateMobile(row.Telephone)
const validationerrors = await Helper.ValidateCustomer(null, mobile, row.code, "BULK");
console.log(`validationerrors ---> ${validationerrors} Telephone--> ${row.Telephone}`)
if (validationerrors.length) {
throw validationerrors.message
} else {
csvData.push(row);
}
}
})
.on("end", async () => {
end = true;
if (!queue.length && !paused) {
stream.emit("finalEnd");
}
}).on("finalEnd", () => {
console.log("ENDDD ---->", csvData)
});
Upvotes: 3