Oded Answer
Oded Answer

Reputation: 21

"Failed to pipe. The response has been emitted already" when reading a stream (nodejs)

So my code is supposed to read some lines from a CSV file, convert them to an array of JSON objects, and return that array. To read the file as a stream, I am using got, and then using it in fast-csv. In order to return the resulting array, I put the entire thing into a Promise like this:

async GetPage() : Promise<{OutputArray:any[], StartingIndex:number}>{
        return new Promise(async (resolve, reject) => {
            const output:any[] = []; 
            const startingIndex = this.currentLocation;
            try{
                parseStream(this.source, {headers:true, maxRows:this.maxArrayLength, skipRows:this.currentLocation, ignoreEmpty:true, delimiter:this.delimiter})
                    .on('error', error => console.log(`parseStream: ${error}`))
                    .on('data', row => {
                        const obj = this.unflatten(row); // data is flattened JSON, need to unflatten it
                        output.push(obj); // append to output array
                        this.currentLocation++;
                    })
                    .on('end', (rowCount: number) => {
                        console.log(`Parsed ${this.currentLocation} rows`);
                        resolve({OutputArray:output, StartingIndex:startingIndex});
                    });
            }
            catch(ex){
                console.log(`parseStream: ${ex}`);
                throw new Error(ex);
            }
        })
    }

Now when I call this once (await GetPage()) it works perfectly fine. The problem is when I'm calling it a second time in a row. I'm getting the following:

UnhandledPromiseRejectionWarning: Error: Failed to pipe. The response has been emitted already.

I've seen a similar case over here: https://github.com/sindresorhus/file-type/issues/342 but from what I gather this is a different case, or rather if it's the same I don't know how to apply the solution here.

The GetPage is a method inside a class CSVStreamParser which is given a Readable in the constructor, and I create that Readable like this: readable:Readable = got.stream(url)

What confuses me is that my first version of GetPage did not include a Promise, but rather accepted a callback (I just sent console.log to test it) and when I called it several times in a row there was no error, but it could not return a value so I converted it to a Promise.

Thank you! :)

EDIT: I have managed to make it work by re-opening the stream at the start of GetPage(), but I am wondering if there is a way to achieve the same result without having to do so? Is there a way to keep the stream open?

Upvotes: 2

Views: 4055

Answers (1)

skara9
skara9

Reputation: 4194

First, remove both of the async, since you are already returning a Promise.

Then remove the try/catch block and throw since you shouldn't throw in a promise. Instead use the reject function.

GetPage() : Promise<{OutputArray:any[], StartingIndex:number}>{
    return new Promise((resolve, reject) => {
        const output:any[] = []; 
        const startingIndex = this.currentLocation;
        parseStream(this.source, {headers:true, maxRows:this.maxArrayLength, skipRows:this.currentLocation, ignoreEmpty:true, delimiter:this.delimiter})
            .on('error', error => reject(error))
            .on('data', row => {
                const obj = this.unflatten(row); // data is flattened JSON, need to unflatten it
                output.push(obj); // append to output array
                this.currentLocation++;
            })
            .on('end', (rowCount: number) => {
                console.log(`Parsed ${this.currentLocation} rows`);
                resolve({OutputArray:output, StartingIndex:startingIndex});
            });
    });
}

Here's some resources to help you learn about async functions and promises.

Upvotes: 0

Related Questions