Reputation: 636
I'm fairly new with rxjs. I'm calling a function below and the complete stream is read and the read console statements are printed, but I never see a "Subscibe done" and I don't know why. What will it take to get this stream to finish? Is something obviously wrong?
const readline$ = RxNode.fromReadLineStream(rl)
.filter((element, index, observable) => {
if (index >= range.start && index < range.stop) {
console.log(`kept line is ${JSON.stringify(element)}`);
return true;
} else {
console.log(`not keeping line ${JSON.stringify(element)}`);
return false;
}
})
.concatMap(line => Rx.Observable.fromPromise(myFunction(line)))
.do(response => console.log(JSON.stringify(response)));
readline$.subscribe(i => { console.log(`Subscribe object: ${util.inspect(i)}`); },
err => { console.error(`Subscribe error: ${util.inspect(err)}`); },
done => { console.log("Subscribe done."); // NEVER CALLED
anotherFunc(); // NEVER CALLED
}
);
Upvotes: 1
Views: 570
Reputation: 96979
You can see from the source code that it send complete notification only when the source stream emits close
event. https://github.com/Reactive-Extensions/rx-node/blob/master/index.js#L100-L102
So if you need the proper complete handler to be called you'll need to close the stream yourself, see How to close a readable stream (before end)?.
In other words the Observable doesn't complete automatically after reading the entire file.
Upvotes: 1