mshell_lauren
mshell_lauren

Reputation: 5236

Problems with making async calls in streams using node.js

I am having a problem with making async calls in my streams. It looks like for some reason if I have three streams and the middle stream makes an async call, the final stream never receives the 'end' event. I can simulate the behavior with some simple through streams and a timeout.

var options = {}
var streamOne = function(){
  return through(function write(row){
    this.emit('data', row)       
  })
}

var streamTwo = function(){
  return through(function write(row){
    this.pause()
    var that = this;

    setTimeout(function(){
      that.emit('data', row)
      that.resume()
    }, 1000)
  })
}

options.streams = [new streamOne(), new streamTwo()]

I then pass this through to event-stream.

var streams = []
//Pass stream one and stream two to es.pipe
streams.push(es.pipe.apply(this, options.streams))

//Then add the final stream 
var endStream = es.pipe(
  new streamThree()
)
streams.push(endStream)

//And send it through the pipeline
es.pipeline.apply(this, streams)

So, this does not work in the current case.

A couple of confusing points: if I remove streamOne, it works! If streamTwo does not make an async call, it works. This makes me think the problem lies in the way the two streams interact. However, if I console.log throughout the code, it looks like everything works fine, streamThree will write the data but never registers the 'end' event. *Note: streamThree is not using through, and is instead using the native Streams module.

Thoughts on why this is happening?

Upvotes: 1

Views: 714

Answers (1)

mshell_lauren
mshell_lauren

Reputation: 5236

After running some test cases, it looks like the I/O was not being handled properly by the pipeline or by through. I'm not entirely sure why this was the case, but I think it was a race condition with the pausing and resuming of the stream that was causing the problem. I did a few things to clean up the code:

1) Simplify the pipeline. Rather than doing nested es.pipes inside the pipeline, I just put the streams in directly. This helped to better manage the flow of data between the streams.

2) Instead of emitting data from my regular through stream, I queued the data using this.queue, letting the module handle potential backpressure.

3) I used the event-stream method es.map to handle the flow of the asynchronous call. I think this was a better solution since it more cleanly handles the pausing and resuming of the stream and still returns a through stream.

Upvotes: 2

Related Questions