Reputation: 21
So my code is supposed to read some lines from a CSV file, convert them to an array of JSON objects, and return that array.
To read the file as a stream, I am using got
, and then using it in fast-csv
.
In order to return the resulting array, I put the entire thing into a Promise like this:
async GetPage() : Promise<{OutputArray:any[], StartingIndex:number}>{
return new Promise(async (resolve, reject) => {
const output:any[] = [];
const startingIndex = this.currentLocation;
try{
parseStream(this.source, {headers:true, maxRows:this.maxArrayLength, skipRows:this.currentLocation, ignoreEmpty:true, delimiter:this.delimiter})
.on('error', error => console.log(`parseStream: ${error}`))
.on('data', row => {
const obj = this.unflatten(row); // data is flattened JSON, need to unflatten it
output.push(obj); // append to output array
this.currentLocation++;
})
.on('end', (rowCount: number) => {
console.log(`Parsed ${this.currentLocation} rows`);
resolve({OutputArray:output, StartingIndex:startingIndex});
});
}
catch(ex){
console.log(`parseStream: ${ex}`);
throw new Error(ex);
}
})
}
Now when I call this once (await GetPage()
) it works perfectly fine.
The problem is when I'm calling it a second time in a row. I'm getting the following:
UnhandledPromiseRejectionWarning: Error: Failed to pipe. The response has been emitted already.
I've seen a similar case over here: https://github.com/sindresorhus/file-type/issues/342 but from what I gather this is a different case, or rather if it's the same I don't know how to apply the solution here.
The GetPage
is a method inside a class CSVStreamParser
which is given a Readable
in the constructor, and I create that Readable like this: readable:Readable = got.stream(url)
What confuses me is that my first version of GetPage did not include a Promise, but rather accepted a callback (I just sent console.log
to test it) and when I called it several times in a row there was no error, but it could not return a value so I converted it to a Promise.
Thank you! :)
EDIT: I have managed to make it work by re-opening the stream at the start of GetPage()
, but I am wondering if there is a way to achieve the same result without having to do so? Is there a way to keep the stream open?
Upvotes: 2
Views: 4055
Reputation: 4194
First, remove both of the async
, since you are already returning a Promise
.
Then remove the try/catch
block and throw
since you shouldn't throw in a promise. Instead use the reject
function.
GetPage() : Promise<{OutputArray:any[], StartingIndex:number}>{
return new Promise((resolve, reject) => {
const output:any[] = [];
const startingIndex = this.currentLocation;
parseStream(this.source, {headers:true, maxRows:this.maxArrayLength, skipRows:this.currentLocation, ignoreEmpty:true, delimiter:this.delimiter})
.on('error', error => reject(error))
.on('data', row => {
const obj = this.unflatten(row); // data is flattened JSON, need to unflatten it
output.push(obj); // append to output array
this.currentLocation++;
})
.on('end', (rowCount: number) => {
console.log(`Parsed ${this.currentLocation} rows`);
resolve({OutputArray:output, StartingIndex:startingIndex});
});
});
}
Here's some resources to help you learn about async functions and promises.
Upvotes: 0