Reputation: 5236
I am having a problem with making async calls in my streams. It looks like for some reason if I have three streams and the middle stream makes an async call, the final stream never receives the 'end' event. I can simulate the behavior with some simple through streams and a timeout.
var options = {}
var streamOne = function(){
return through(function write(row){
this.emit('data', row)
})
}
var streamTwo = function(){
return through(function write(row){
this.pause()
var that = this;
setTimeout(function(){
that.emit('data', row)
that.resume()
}, 1000)
})
}
options.streams = [new streamOne(), new streamTwo()]
I then pass this through to event-stream.
var streams = []
//Pass stream one and stream two to es.pipe
streams.push(es.pipe.apply(this, options.streams))
//Then add the final stream
var endStream = es.pipe(
new streamThree()
)
streams.push(endStream)
//And send it through the pipeline
es.pipeline.apply(this, streams)
So, this does not work in the current case.
A couple of confusing points: if I remove streamOne, it works! If streamTwo does not make an async call, it works. This makes me think the problem lies in the way the two streams interact. However, if I console.log
throughout the code, it looks like everything works fine, streamThree will write the data but never registers the 'end' event. *Note: streamThree is not using through, and is instead using the native Streams module.
Thoughts on why this is happening?
Upvotes: 1
Views: 714
Reputation: 5236
After running some test cases, it looks like the I/O was not being handled properly by the pipeline or by through. I'm not entirely sure why this was the case, but I think it was a race condition with the pausing and resuming of the stream that was causing the problem. I did a few things to clean up the code:
1) Simplify the pipeline. Rather than doing nested es.pipe
s inside the pipeline, I just put the streams in directly. This helped to better manage the flow of data between the streams.
2) Instead of emitting data from my regular through stream, I queued the data using this.queue
, letting the module handle potential backpressure.
3) I used the event-stream method es.map to handle the flow of the asynchronous call. I think this was a better solution since it more cleanly handles the pausing and resuming of the stream and still returns a through stream.
Upvotes: 2