Jason Cromer
Jason Cromer

Reputation: 1498

Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close in Node Pipeline stream

I am using the stream.pipeline functionality from Node to upload some data to S3. The basic idea I'm implementing is pulling files from a request and writing them to S3. I have one pipeline that pulls zip files and writes them to S3 successfully. However, I want my second pipeline to make the same request, but unzip and write the unzipped files to S3. The pipeline code looks like the following:

pipeline(request.get(...), s3Stream(zipFileWritePath)),
pipeline(request.get(...), new unzipper.Parse(), etl.map(entry => entry.pipe(s3Stream(createWritePath(writePath, entry)))))

The s3Stream function looks like so:

function s3Stream(file) {
    const pass = new stream.PassThrough()
    s3Store.upload(file, pass)
    return pass
}

The first pipeline works well, and is currently operating greatly in production. However, when adding the second pipeline, I get the following error:

Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
at Parse.onclose (internal/streams/end-of-stream.js:56:36)
at Parse.emit (events.js:187:15)
at Parse.EventEmitter.emit (domain.js:442:20)
at Parse.<anonymous> (/node_modules/unzipper/lib/parse.js:28:10)
at Parse.emit (events.js:187:15)
at Parse.EventEmitter.emit (domain.js:442:20)
at finishMaybe (_stream_writable.js:641:14)
at afterWrite (_stream_writable.js:481:3)
at onwrite (_stream_writable.js:471:7)
at /node_modules/unzipper/lib/PullStream.js:70:11
at afterWrite (_stream_writable.js:480:3)
at process._tickCallback (internal/process/next_tick.js:63:19)

Any idea what could be causing this or solutions to resolve this would be greatly appreciated!

Upvotes: 17

Views: 25488

Answers (1)

Sceat
Sceat

Reputation: 1579

TL;DR

When using a pipeline you accept to consume the readable stream fully, you don't want anything stopping before the readable ends.

Deep dive

After some time working with those shenanigans here is some more usefull informations.

import stream from 'stream'

const s1 = new stream.PassThrough()
const s2 = new stream.PassThrough()
const s3 = new stream.PassThrough()

s1.on('end', () => console.log('end 1'))
s2.on('end', () => console.log('end 2'))
s3.on('end', () => console.log('end 3'))
s1.on('close', () => console.log('close 1'))
s2.on('close', () => console.log('close 2'))
s3.on('close', () => console.log('close 3'))

stream.pipeline(
    s1,
    s2,
    s3,
    async s => { for await (_ of s) { } },
    err => console.log('end', err)
)

now if i call s2.end() it will close all parents

end 2
close 2
end 3
close 3

pipeline is the equivalent of s3(s2(s1)))

but if i call s2.destroy() it print and destroy everything, this is your problem here a stream is destroyed before it ends normally, either an error or a return/break/throws in an asyncGenerator/asyncFunction

close 2
end Error [ERR_STREAM_PREMATURE_CLOSE]: Premature close
    at PassThrough.onclose (internal/streams/end-of-stream.js:117:38)
    at PassThrough.emit (events.js:327:22)
    at emitCloseNT (internal/streams/destroy.js:81:10)
    at processTicksAndRejections (internal/process/task_queues.js:83:21) {
  code: 'ERR_STREAM_PREMATURE_CLOSE'
}
close 1
close 3

You must not let one of the streams without a way to catch their errors

stream.pipeline() leaves dangling event listeners on the streams after theallback has been invoked. In the case of reuse of streams after failure, this can cause event listener leaks and swallowed errors.

node source (14.4)

  const onclose = () => {
    if (readable && !readableEnded) {
      if (!isReadableEnded(stream))
        return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
    }
    if (writable && !writableFinished) {
      if (!isWritableFinished(stream))
        return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
    }
    callback.call(stream);
  };

Upvotes: 13

Related Questions