Reputation: 1531
Let's say I have the following code:
try {
let size = 0;
await pipeline(
fs.createReadStream('lowercase.txt'),
async function* (source) {
for await (const chunk of source) {
size += chunk.length;
if (size >= 1000000) {
throw new Error('File is too big');
}
yield String(chunk).toUpperCase();
}
},
fs.createWriteStream('uppercase.txt')
);
console.log('Pipeline succeeded.');
} catch (error) {
console.log('got error:', error);
}
How do I make sure I am properly closing the streams in every single case? The node docs aren't much help -- they just tell me that I am going to have dangling event listeners:
stream.pipeline() will call stream.destroy(err) on all streams except:
Readable streams which have emitted 'end' or 'close'.
Writable streams which have emitted 'finish' or 'close'.
stream.pipeline() leaves dangling event listeners on the streams after the callback has been invoked. In the case of reuse of streams after failure, this can cause event listener leaks and swallowed errors.
Upvotes: 8
Views: 12075
Reputation: 5786
TLDR;
pipe
has those problemspipeline
was created to solve them all, and it doespipeline
is great if have all of the parts from start to finish, but if not:
stream.compose
function to address thatThe accepted answer just brushes off pipeline
, but it's specifically designed to solve this problem. pipe
absolutely suffered from it (more below), but I've not found a case where pipeline
doesn't properly close streams around files, http, etc. YMMV with random npm packages, but if it has a close
or destroy
function, as well as an on('error'
event, it should be fine.
To demonstrate, this makes a call to the shell to see if our test files are open:
const listOpenFiles = async () => {
const { stdout } = await promisify(exec)("lsof -c node | awk '{print $9}'");
// only show our test files
const openFiles = stdout.split('\n').filter((str) => str.endsWith('case.txt'));
console.log('***** open files:\n', openFiles, '\n-------------');
};
If you call that inside the loop in the example above:
for await (const chunk of source) {
await listOpenFiles();
The output will keep repeating:
***** open files:
[
'/path/to/lowercase.txt',
'/path/to/uppercase.txt'
]
If you call it again after your catch, you can see that everything is closed.
***** open files:
[]
What the pipeline
docs are referring to in the first 2 bullet points is that it won't close streams that have already closed because... well, they're already closed. As for the dangling listeners, those are indeed left on the individual streams passed to pipeline
. However, in your example (a typical case), you're not keeping a reference to the individual streams anyway; they'll be garbage collected immediately after the pipeline completes. It's a warning about potential side effects if you have, for example, a constant reference to one of them.
// using this same instance over and over will end up with tons of dangling listeners
export const capitalizer = new Transform(// ...
Instead, it's better to have "clean" instances. Now that generator functions are easy to chain, it's less common to even have a reference to transforms at all, but you can simply make a function that returns a new instance rather than having a constant one:
export const createCaptilizer = () => new Transform(// ...
In short, the above example is fine on all 3 points.
pipe
pipe
, on the other hand, does indeed have the above problems with propagation.
const csvStream = (file) => {
// does not expose file errors, nor clean up the file stream on parsing errors!!!
return fs.createReadStream(file).pipe(createCsvTransform());
};
It's widely agreed that it's painful/unintuitive, but it's too late to change it now. I try to avoid it where I can, and I recommend pipeline
where possible. However, it's important to note that pipeline
requires having all of the parts together. So e.g. for the above, you need the final Writable
target as well. You still have to use pipe
in cases like this if you want to build just part of a chain. The workaround for this is easier to reason about in isolation:
const csvStream = (file) => {
const fileStream = fs.createReadStream(file);
const transform = createCsvTransform();
// pass file errors forward
fileStream.on('error', (error) => transform.emit('error', error));
// close file stream on parsing errors
transform.on('error', () => fileStream.close());
return transform;
}
However, there is good news. It's still experimental, but soon stream will expose a stream.compose
function. It has all of the propagation/cleanup advantages of pipeline
, but just returns a new stream. Essentially, it's what most people thought that pipe
would do. ;)
// NO propagation or cleanup
readable.pipe(transform);
// automatic propagation and cleanup
stream.compose(readable, transform);
Until then, check out https://www.npmjs.com/package/stream-chain
pipeline
and await
Note that the example above uses await pipeline(//...
, but the linked docs are to the synchronous version. That doesn't return a promise, so the await
does nothing. From node 15 on up, you will generally want the stream/promises
api here: https://nodejs.org/api/stream.html#streams-promises-api
import { pipeline } from 'stream/promises'; // NOT 'stream'
Before node 15, you can make it a promise with util's promisify
:
import { pipeline } from 'stream';
import { promisify } from 'util';
await promisify(pipeline)(// ...
Or, to make it simpler for a whole file:
import * as stream from 'stream';
import { promisify } from 'util';
const pipeline = promisify(stream.pipeline);
I only mention this because, were you use await
with the synchronous version, it wouldn't actually be completed after the try/catch
, so might give the false impression that it failed to clean up when, in fact, it had yet to complete.
Upvotes: 15
Reputation: 707238
So, I find many of the node.js stream compound operations such as pipeline()
and .pipe()
to be really bad/incomplete at error handling. For example, if you just do this:
fs.createReadStream("input.txt")
.pipe(fs.createWriteStream("output.txt"))
.on('error', err => {
console.log(err);
}).on('finish', () => {
console.log("all done");
});
You would expect that if there was an error opening the readStream that you'd get that error in your error handler here, but "no" that is not the case. An error opening that input file will be unhandled. There's some logic to that as .pipe()
returns the output stream and an input error isn't an error on the output stream, but when that's not passed through, it makes it very easy to miss errors on the input stream. The .pipe()
operation could have listened for errors on the input stream and passed an error through (even if it was a pipeErr
or something different) and then it could have also cleaned up the writeStream properly upon a read error. But, .pipe()
wasn't implemented that thoroughly. It seems to want to assume that there would never be an error on the input stream.
Instead, you have to separately save the readStream object and attach an error handler to it directly in order to see that error. So, I just don't trust this compound stuff any more and the doc never really explains how to do proper error handling. I tried to look at the code for pipeline()
to see if I could understand the error handling and that did not prove to be a fruitful endeavor.
So, your particular problem seems like it could be done with a transform stream:
const fs = require('fs');
const { Transform } = require('stream');
const myTransform = new Transform({
transform: function(chunk, encoding, callback) {
let str = chunk.toString('utf8');
this.push(str.toUpperCase());
callback();
}
});
function upperFile(input, output) {
return new Promise((resolve, reject) => {
// common function for cleaning up a partial output file
function errCleanup(err) {
fs.unlink(output, function(e) {
if (e) console.log(e);
reject(err);
});
}
let inputStream = fs.createReadStream(input, {encoding: 'utf8'});
let outputStream = fs.createWriteStream(output, {emitClose: true});
// have to separately listen for read/open errors
inputStream.on("error", err => {
// have to manually close writeStream when there was an error reading
if (outputStream) outputStream.destroy();
errCleanup(err);
});
inputStream.pipe(myTransform)
.pipe(outputStream)
.on("error", errCleanup)
.on("close", resolve);
});
}
// sample usage
upperFile("input.txt", "output.txt").then(() => {
console.log("all done");
}).catch(err => {
console.log("got error", err);
});
As you can see, about 2/3 of this code is dealing with errors in a robust manner (the part that the built-in operations don't do properly).
Upvotes: 8