Christian Vincenzo Traina
Christian Vincenzo Traina

Reputation: 10384

Pipe RxJS observable to existing subject, but not on completion

I implemented an RxJS architecture based on the answer to this question: Pipe RxJS observable to existing subject

Later, I noticed that the observable randomly closed and I've spent a lot of time to find the cause. In the end, appeared that this snippet was causing the closing:

const delayed = Observable.of(arr.shift()).delay(1000);
merge(otherObs, delayed).subscribe(mySubject);

The of/delay observable was completing after 1 second, and it piped the completion to mySubject, making it unable to receive other values, including the ones sent by otherObs in case they arrived after the timeout.

Here is a StackBlitz that shows the problem.

How can I avoid mySubject completes, without explicitly writing the callbacks?

Upvotes: 0

Views: 117

Answers (1)

Rafi Henig
Rafi Henig

Reputation: 6424

This is a known issue, as solution I would suggest extending Subject, to prevent its completion as long as there are others observers listening to it, as demonstrated below:

class RescrictedCompletionSubject<T> extends Subject<T>{

  public complete(): void {
    if (this.observers.length <= 1) super.complete();
  }
  
}

Optionally:

of(1).pipe(delay(1000)).subscribe(mySubject.next, mySubject.error);

Last but not least:

merge(otherObs, delayed).subscribe(mySubject);

Upvotes: 1

Related Questions