Reputation: 565
Suppose I want observable to emit value periodically until another observable emits. So I can use timer
and takeUntil
to achive that.
But then I want to process every emitted value and stop (error) emitting when some condition becomes true. So I write next piece of code:
const { timer, Subject } = 'rxjs';
const { takeUntil, map } = 'rxjs/operators';
const s = new Subject();
let count = 0;
function processItem(item) {
count++;
return count < 3;
}
const value = "MyValue";
timer(0, 1000).pipe(
takeUntil(s),
map(() => {
if (processItem(value)) {
console.log(`Processing done`);
return value;
} else {
s.next(true);
s.complete();
console.error(`Processing error`);
throw new Error(`Stop pipe`);
}
}),
)
But instead of getting error I have my Observable completed.
Only if I comment out takeUntil(s)
operator, I get error.
Looks like when pipe operator completes, it's value is not emitted immediately, but remembered and emitted at the end of next "iteration" of the pipe, then replaced by new result and so on. And in my situation next iteration, when error should be emitted, is prevented by takeUntil
. Question is am I right with that assumption, and if so, why rxjs is designed in that way?
Upvotes: 1
Views: 299
Reputation: 96969
First of all, each Rx chain can emit one error
or one complete
notification but never both. (See http://reactivex.io/documentation/contract.html section "The Contract Governing Notifications").
takeUntil
operator emits complete
when it's notification Observable (s
in your case) emits any next
notification. This means that when s
emits the chain will be completed and you never receive any further error
notifications.
The last thing and probably most confusing is that everything in RxJS happens synchronously unless you work with time (eg. delay
operator) or you specifically use observeOn
operator with an asynchronous scheduler. So when you call s.next(true)
inside map
this next
notification is immediately propagated to takeUntil
which completes the chain and as I mentioned above, you can receive one error
or one complete
notification but never both.
It looks like you don't even need to be using takeUntil
because if you throw an error inside map
it's automatically wrapped and sent further as an error
notification (How to throw error from RxJS map operator (angular)) and the chain is disposed automatically so there's no point in trying to complete it after that with takeUntil
.
Upvotes: 1