Reputation: 13
I am extremely new to NodeJS and promises, after a google search found the code below and made it into a promise to read a CSV file using fast-csv, create an array of the data based on the packNr. The below code is spliting the files as expected but my issue is writing the array back to a new CSV file. Somehow I need to finish writing each packNr before the next one begins. Using the code below I only get the header of each file to write before the code completes.
var readCSV = function(path, options) { const datas = {}; // data['123'] = CSV data filtered for id = 123 return new Promise(function(resolve, reject) { fastCsv .parseFile(path, options) .on('data', d => { if (!datas[d.packNr]) datas[d.packNr] = []; datas[d.packNr].push(d) }) .on('end', () => { Object.keys(data).forEach(packNr => { // For each ID, write a new CSV file fastCsv .write(datas[packNr],options) .pipe(fs.createWriteStream(`.\data-id-${packNr}.csv`)) }) resolve(); }) })
Upvotes: 0
Views: 184
Reputation: 354
fastCsv
is a stream. Stream have their own flow (if i can say that), they inherit from the EventEmitters constructor so they are event emitters. That means to include their processes into your file flow you should use their events.
At the time a stream connection close it will emit a finish
and a close
event. So you can resolve your Promise using it, like so
.on('end', () => {
Object.keys(data).forEach(packNr => {
// For each ID, write a new CSV file
fastCsv
.write(datas[packNr],options)
.pipe(fs.createWriteStream(`.\data-id-${packNr}.csv`))
})
fastCsv.on('finish', () => resolve())
// fastCsv.on('close', () => resolve()) that should work as well
})
Another solution, even better actualy as it will handle all events(end, finish, close, error) of stream's end of connection is to use the pipeline method from the stream module
// on the top of your file
const { pipeline } = require('stream')
.on('end', () => {
Object.keys(data).forEach(packNr => {
// For each ID, write a new CSV file
pipeline(
fastCsv.write(datas[packNr],options),
fs.createWriteStream(`.\data-id-${packNr}.csv`),
e => {
if(e) reject(e)
else resolve()
}
)
})
})
===== (Edit) final solution =======
Still remain a problem in my logic because on each iteration (forEach()) it creates a new stream. The promise will resolve a the end of the first ending stream but not at the end of the last one. To fix it you can do that
const { pipeline } = require('stream')
.on('end', () => {
const dataObj = Object.keys(data)
const lg = dataObj.length
let i = 0
dataObj.forEach(packNr => {
// For each ID, write a new CSV file
pipeline(
fastCsv.write(datas[packNr],options),
fs.createWriteStream(`.\data-id-${packNr}.csv`),
e => {
if(e) reject(e)
i++
if(i === lg) resolve()
}
)
})
})
Upvotes: 1
Reputation: 687
I think the problem is that fastCsv.write
is itself asynchronous, so of course the resolve()
is executing before it completes. I believe the answer is to wait until the fastCsv.write()
completes before resolving.
Upvotes: 0