Reputation: 6245
Consider the following:
var asyncFunction = function(data, callback) {
doAsyncyThing(function(data){
// do some stuff
return callback(err)
})
}
fs.createReadStream('eupmc_lite_metadata_2016_04_15.json')
.pipe(JSONstream.parse())
.on('data', asyncFunction) // <- how to let asyncFunction complete before continuing
How does the stream know when asyncFunction has completed? Is there any way to use asynchronous functions from within streams?
Upvotes: 30
Views: 15685
Reputation: 1172
I think this is enough:
const Transform = require('node:stream').Transform
const deferTransform = new Transform({
transform: (chunk, encoding, next) => {
Promise.resolve(`${chunk.toString().toUpperCase()} `).then((data) =>
next(null, data)
);
},
});
fs.createReadStream('eupmc_lite_metadata_2016_04_15.json')
.pipe(JSONstream.parse())
.pipe(deferTransform)
Upvotes: 2
Reputation: 1205
Check out transform streams. They give you the ability to run async code on a chunk, and then call a callback when you are finished. Here are the docs: https://nodejs.org/api/stream.html#transform_transformchunk-encoding-callback
As a simple example, you can do something like:
const Transform = require('stream').Transform
class WorkerThing extends Transform {
_transform(chunk, encoding, cb) {
asyncFunction(chunk, cb)
}
}
const workerThing = new WorkerThing()
fs.createReadStream('eupmc_lite_metadata_2016_04_15.json')
.pipe(JSONstream.parse())
.pipe(workerThing)
Upvotes: 24