2652763
2652763

Reputation: 513

Using promises with streams in node.js

I've refactored a simple utility to use promises. It fetches a pdf from the web and saves it to disk. It should then open the file in a pdf viewer once saved to disk. The file appears on disk and is valid, the shell command opens the OSX Preview application, but a dialog pops up complaining that the file is empty.

What's the best way to execute the shell function once the filestream has been written to disk?

// download a pdf and save to disk
// open pdf in osx preview for example
download_pdf()
  .then(function(path) {
    shell.exec('open ' + path).code !== 0);
  });

function download_pdf() {
  const path = '/local/some.pdf';
  const url = 'http://somewebsite/some.pdf';
  const stream = request(url);
  const write = stream.pipe(fs.createWriteStream(path))
  return streamToPromise(stream);
}

function streamToPromise(stream) {
  return new Promise(function(resolve, reject) {
    // resolve with location of saved file
    stream.on("end", resolve(stream.dests[0].path));
    stream.on("error", reject);
  })
}

Upvotes: 33

Views: 50905

Answers (5)

Alan Dong
Alan Dong

Reputation: 4095

Stream promises API

This API added in v15 provides stream.finished:

const { finished } = require('node:stream/promises');
const fs = require('node:fs');

const rs = fs.createReadStream('archive.tar');

async function run() {
  await finished(rs);
  console.log('Stream is done reading.');
}

run().catch(console.error);
rs.resume(); // Drain the stream.

https://nodejs.org/api/stream.html#stream_event_finish

Upvotes: 17

Snowbldr
Snowbldr

Reputation: 786

This can be done very nicely using the promisified pipeline function. Pipeline also provides extra functionality, such as cleaning up the streams.

const pipeline = require('util').promisify(require( "stream" ).pipeline)

pipeline(
  request('http://somewebsite/some.pdf'),
  fs.createWriteStream('/local/some.pdf')
).then(()=>
  shell.exec('open /local/some.pdf').code !== 0)
);

Upvotes: 2

Vasyl Boroviak
Vasyl Boroviak

Reputation: 6138

After a bunch of tries I found a solution which works fine all the time. See JSDoc comments for more info.

/**
 * Streams input to output and resolves only after stream has successfully ended.
 * Closes the output stream in success and error cases.
 * @param input {stream.Readable} Read from
 * @param output {stream.Writable} Write to
 * @return Promise Resolves only after the output stream is "end"ed or "finish"ed.
 */
function promisifiedPipe(input, output) {
    let ended = false;
    function end() {
        if (!ended) {
            ended = true;
            output.close && output.close();
            input.close && input.close();
            return true;
        }
    }

    return new Promise((resolve, reject) => {
        input.pipe(output);
        input.on('error', errorEnding);

        function niceEnding() {
            if (end()) resolve();
        }

        function errorEnding(error) {
            if (end()) reject(error);
        }

        output.on('finish', niceEnding);
        output.on('end', niceEnding);
        output.on('error', errorEnding);
    });
};

Usage example:

function downloadFile(req, res, next) {
  promisifiedPipe(fs.createReadStream(req.params.file), res).catch(next);
}

Update. I've published the above function as a Node module: http://npm.im/promisified-pipe

Upvotes: 14

kharandziuk
kharandziuk

Reputation: 12920

The other solution can look like this:

const streamAsPromise = (readable) => {
  const result = []
  const w = new Writable({
    write(chunk, encoding, callback) {·
      result.push(chunk)
      callback()
    }
  })
  readable.pipe(w)
  return new Promise((resolve, reject) => {
    w.on('finish', resolve)
    w.on('error', reject)
  }).then(() => result.join(''))
}

and you can use it like:

streamAsPromise(fs.createReadStream('secrets')).then(() => console.log(res))

Upvotes: 4

Jaromanda X
Jaromanda X

Reputation: 1

In this line

stream.on("end", resolve(stream.dests[0].path));

you are executing resolve immediately, and the result of calling resolve (which will be undefined, because that's what resolve returns) is used as the argument to stream.on - not what you want at all, right.

.on's second argument needs to be a function, rather than the result of calling a function

Therefore, the code needs to be

stream.on("end", () => resolve(stream.dests[0].path));

or, if you're old school:

stream.on("end", function () { resolve(stream.dests[0].path); });

another old school way would be something like

stream.on("end", resolve.bind(null, stream.dests[0].path));

No, don't do that :p see comments

Upvotes: 29

Related Questions