fantom
fantom

Reputation: 565

Exception in observable pipe is suppressed

Suppose I want observable to emit value periodically until another observable emits. So I can use timer and takeUntil to achive that. But then I want to process every emitted value and stop (error) emitting when some condition becomes true. So I write next piece of code:

const { timer, Subject } = 'rxjs';
const { takeUntil, map } = 'rxjs/operators';

const s = new Subject();
let count = 0;
function processItem(item) {
  count++;
  return count < 3;
}

const value = "MyValue";

timer(0, 1000).pipe(
      takeUntil(s),
      map(() => {
        if (processItem(value)) {
          console.log(`Processing done`);
          return value;
        } else {
          s.next(true);
          s.complete();
          console.error(`Processing error`);
          throw new Error(`Stop pipe`);
        }
      }),
    )

Playground

But instead of getting error I have my Observable completed. Only if I comment out takeUntil(s) operator, I get error. Looks like when pipe operator completes, it's value is not emitted immediately, but remembered and emitted at the end of next "iteration" of the pipe, then replaced by new result and so on. And in my situation next iteration, when error should be emitted, is prevented by takeUntil. Question is am I right with that assumption, and if so, why rxjs is designed in that way?

Upvotes: 1

Views: 299

Answers (1)

martin
martin

Reputation: 96969

First of all, each Rx chain can emit one error or one complete notification but never both. (See http://reactivex.io/documentation/contract.html section "The Contract Governing Notifications").

takeUntil operator emits complete when it's notification Observable (s in your case) emits any next notification. This means that when s emits the chain will be completed and you never receive any further error notifications.

The last thing and probably most confusing is that everything in RxJS happens synchronously unless you work with time (eg. delay operator) or you specifically use observeOn operator with an asynchronous scheduler. So when you call s.next(true) inside map this next notification is immediately propagated to takeUntil which completes the chain and as I mentioned above, you can receive one error or one complete notification but never both.

It looks like you don't even need to be using takeUntil because if you throw an error inside map it's automatically wrapped and sent further as an error notification (How to throw error from RxJS map operator (angular)) and the chain is disposed automatically so there's no point in trying to complete it after that with takeUntil.

Upvotes: 1

Related Questions