Reputation: 1498
I am using the stream.pipeline
functionality from Node to upload some data to S3. The basic idea I'm implementing is pulling files from a request and writing them to S3. I have one pipeline
that pulls zip files and writes them to S3 successfully. However, I want my second pipeline
to make the same request, but unzip and write the unzipped files to S3. The pipeline code looks like the following:
pipeline(request.get(...), s3Stream(zipFileWritePath)),
pipeline(request.get(...), new unzipper.Parse(), etl.map(entry => entry.pipe(s3Stream(createWritePath(writePath, entry)))))
The s3Stream function looks like so:
function s3Stream(file) {
const pass = new stream.PassThrough()
s3Store.upload(file, pass)
return pass
}
The first pipeline
works well, and is currently operating greatly in production. However, when adding the second pipeline, I get the following error:
Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
at Parse.onclose (internal/streams/end-of-stream.js:56:36)
at Parse.emit (events.js:187:15)
at Parse.EventEmitter.emit (domain.js:442:20)
at Parse.<anonymous> (/node_modules/unzipper/lib/parse.js:28:10)
at Parse.emit (events.js:187:15)
at Parse.EventEmitter.emit (domain.js:442:20)
at finishMaybe (_stream_writable.js:641:14)
at afterWrite (_stream_writable.js:481:3)
at onwrite (_stream_writable.js:471:7)
at /node_modules/unzipper/lib/PullStream.js:70:11
at afterWrite (_stream_writable.js:480:3)
at process._tickCallback (internal/process/next_tick.js:63:19)
Any idea what could be causing this or solutions to resolve this would be greatly appreciated!
Upvotes: 17
Views: 25488
Reputation: 1579
When using a pipeline you accept to consume the readable stream fully, you don't want anything stopping before the readable ends.
After some time working with those shenanigans here is some more usefull informations.
import stream from 'stream'
const s1 = new stream.PassThrough()
const s2 = new stream.PassThrough()
const s3 = new stream.PassThrough()
s1.on('end', () => console.log('end 1'))
s2.on('end', () => console.log('end 2'))
s3.on('end', () => console.log('end 3'))
s1.on('close', () => console.log('close 1'))
s2.on('close', () => console.log('close 2'))
s3.on('close', () => console.log('close 3'))
stream.pipeline(
s1,
s2,
s3,
async s => { for await (_ of s) { } },
err => console.log('end', err)
)
now if i call s2.end()
it will close all parents
end 2
close 2
end 3
close 3
pipeline is the equivalent of s3(s2(s1)))
but if i call s2.destroy()
it print and destroy everything, this is your problem here a stream is destroyed before it ends normally, either an error or a return/break/throws in an asyncGenerator/asyncFunction
close 2
end Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
at PassThrough.onclose (internal/streams/end-of-stream.js:117:38)
at PassThrough.emit (events.js:327:22)
at emitCloseNT (internal/streams/destroy.js:81:10)
at processTicksAndRejections (internal/process/task_queues.js:83:21) {
code: 'ERR_STREAM_PREMATURE_CLOSE'
}
close 1
close 3
You must not let one of the streams without a way to catch their errors
stream.pipeline() leaves dangling event listeners on the streams after theallback has been invoked. In the case of reuse of streams after failure, this can cause event listener leaks and swallowed errors.
const onclose = () => {
if (readable && !readableEnded) {
if (!isReadableEnded(stream))
return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
}
if (writable && !writableFinished) {
if (!isWritableFinished(stream))
return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
}
callback.call(stream);
};
Upvotes: 13