John Estess
John Estess

Reputation: 636

RxJS subscribe never finishes

I'm fairly new with rxjs. I'm calling a function below and the complete stream is read and the read console statements are printed, but I never see a "Subscibe done" and I don't know why. What will it take to get this stream to finish? Is something obviously wrong?

const readline$ = RxNode.fromReadLineStream(rl)
    .filter((element, index, observable) => {
        if (index >= range.start && index < range.stop) {
            console.log(`kept line is ${JSON.stringify(element)}`);
            return true;
        } else {
            console.log(`not keeping line ${JSON.stringify(element)}`);
            return false;
        }
    })
    .concatMap(line => Rx.Observable.fromPromise(myFunction(line)))
    .do(response => console.log(JSON.stringify(response)));

readline$.subscribe(i => { console.log(`Subscribe object: ${util.inspect(i)}`); },
                  err => { console.error(`Subscribe error: ${util.inspect(err)}`); },
                 done => { console.log("Subscribe done."); // NEVER CALLED
                           anotherFunc();                  // NEVER CALLED
                     }
);

Upvotes: 1

Views: 570

Answers (1)

martin
martin

Reputation: 96979

You can see from the source code that it send complete notification only when the source stream emits close event. https://github.com/Reactive-Extensions/rx-node/blob/master/index.js#L100-L102

So if you need the proper complete handler to be called you'll need to close the stream yourself, see How to close a readable stream (before end)?.
In other words the Observable doesn't complete automatically after reading the entire file.

Upvotes: 1

Related Questions