Reputation: 807
I have a stream that is composed from a chain of pipes.
I am using event-stream package to create the building blocks of the pipes.
The code gets a file from S3, unzips it, parses it and send the data to some async function
I am trying to get the promise resolved when it finished handling that file.
How can I be sure that the all the chain has finished draining?
My current solution looks like this.
it looks bad and I still think that there is a possibility that resolve()
will be called while there are data chunks in the gzReader for example.
thanks
const inputStream = this.s3client.getObject(params).createReadStream()
inputStream.on('end',() => {
console.log("Finished handling file " + fileKey)
let stopInterval = setInterval(() => {
if (counter == 0) {
resolve(this.eventsSent)
clearInterval(stopInterval)
}
}, 300)
})
const gzReader = zlib.createGunzip();
inputStream
.pipe(gzReader)
.pipe(es.split())
.pipe(es.parse())
.pipe(es.mapSync(data => {
counter++
this.eventsSent.add(data.data)
asyncFunc(this.destinationStream, data.data)
.then(() => {
counter--
})
.catch((e) => {
counter--
console.error('Failed sending event ' + data.data + e)
})
}))
Upvotes: 4
Views: 629
Reputation: 1051
Because you never initialize counter, it is zero and after the first 300ms, your function resolves (which can be before your pipes are working and increase the counter).
So don't use setInterval
;) You don't need it.
Also there is no need to use mapSync, if you already call an async function in it. Just use map and pass the data and callback (https://github.com/dominictarr/event-stream#map-asyncfunction). Don't forget to call the callback in your async function!
Add a last step in your pipe: wait(callback)
(https://github.com/dominictarr/event-stream#wait-callback)
There you can resolve.
Upvotes: 2