Reputation: 10384
I implemented an RxJS architecture based on the answer to this question: Pipe RxJS observable to existing subject
Later, I noticed that the observable randomly closed and I've spent a lot of time to find the cause. In the end, appeared that this snippet was causing the closing:
const delayed = Observable.of(arr.shift()).delay(1000);
merge(otherObs, delayed).subscribe(mySubject);
The of
/delay
observable was completing after 1 second, and it piped the completion to mySubject
, making it unable to receive other values, including the ones sent by otherObs
in case they arrived after the timeout.
Here is a StackBlitz that shows the problem.
How can I avoid mySubject
completes, without explicitly writing the callbacks?
Upvotes: 0
Views: 117
Reputation: 6424
This is a known issue, as solution I would suggest extending Subject
, to prevent its completion as long as there are others observers listening to it, as demonstrated below:
class RescrictedCompletionSubject<T> extends Subject<T>{
public complete(): void {
if (this.observers.length <= 1) super.complete();
}
}
Optionally:
of(1).pipe(delay(1000)).subscribe(mySubject.next, mySubject.error);
Last but not least:
merge(otherObs, delayed).subscribe(mySubject);
Upvotes: 1