Fergie
Fergie

Reputation: 6245

Node.js: can you use asynchronous functions from within streams?

Consider the following:

var asyncFunction = function(data, callback) {
  doAsyncyThing(function(data){
    // do some stuff
    return callback(err)
  })
}
fs.createReadStream('eupmc_lite_metadata_2016_04_15.json')
  .pipe(JSONstream.parse())
  .on('data', asyncFunction)   // <- how to let asyncFunction complete before continuing

How does the stream know when asyncFunction has completed? Is there any way to use asynchronous functions from within streams?

Upvotes: 30

Views: 15685

Answers (2)

Pasalino
Pasalino

Reputation: 1172

I think this is enough:

const Transform = require('node:stream').Transform

const deferTransform = new Transform({
  transform: (chunk, encoding, next) => {
    Promise.resolve(`${chunk.toString().toUpperCase()} `).then((data) =>
      next(null, data)
    );
  },
});


fs.createReadStream('eupmc_lite_metadata_2016_04_15.json')
.pipe(JSONstream.parse())
.pipe(deferTransform)

Upvotes: 2

Ryan Quinn
Ryan Quinn

Reputation: 1205

Check out transform streams. They give you the ability to run async code on a chunk, and then call a callback when you are finished. Here are the docs: https://nodejs.org/api/stream.html#transform_transformchunk-encoding-callback

As a simple example, you can do something like:

const Transform = require('stream').Transform
class WorkerThing extends Transform {
    _transform(chunk, encoding, cb) {
        asyncFunction(chunk, cb)
    }
}

const workerThing = new WorkerThing()

fs.createReadStream('eupmc_lite_metadata_2016_04_15.json')
.pipe(JSONstream.parse())
.pipe(workerThing)

Upvotes: 24

Related Questions