Eugene Shmorgun
Eugene Shmorgun

Reputation: 2065

Why Observable.race not working if one of observable stop emit events?

I'd like to implement websocket reconnect in webapp if internet connection is lost. In order to detect that internet is lost I use ping-pong approach, which means that I send from client ping-message and server returns me pong-message.

When webapp loaded I send init ping message and start to listen a reply on socket some kind of this:

this.websocket.onmessage = (evt) => {
      try {

        const websocketPayload: any = JSON.parse(evt.data);

        if (websocketPayload.pong !== undefined && websocketPayload.pong == 1) {

          this.pingPong$.next('pong');
        }

It means that internet connection looks ok and we can continue. Also I have the follow code:

 Observable.race(
      Observable.of('timeout').delay(5000).repeat(),
      this.pingPong$
    ).subscribe((data) => {
      console.log("[ping-pong]:", data);
      if (data == 'pong') {
        Observable.interval(5000).take(1).subscribe(() => {
          console.log("[ping-pong]:sending ping")
          this.send({ping:1})
        });
      } else if (data == 'timeout'){
        // show reconnect screen and start reconnect
        console.error("It looks like websocket connection lost");
      }
    });

But! When this.pingPong$ subject stops to emit events - .next() doesn't happen because of we can't get response when I break connection manually - I considered that in Observable.race this observable will be emitted

Observable.of('timeout').delay(5000).repeat()

But my subscribe never happens if this.pingPong$ stop emitting.

Why ?

Thank you

Upvotes: 2

Views: 1419

Answers (1)

martin
martin

Reputation: 96891

race picks and keeps subscribed to the first Observable that emits.

So if your this.pingPong$ starts emitting and then stops it makes no difference because race keeps subscribed to this.pingPong$. The other Observables don't matter any more. You might want emit one value from this.pingPong$ and the repeat the whole process. For example like the following:

Observable.race(
    Observable.of('timeout').delay(5000).repeat(),
    this.pingPong$
  )
  .pipe(
    take(1), // complete the chain immediately
    repeat() // resubscribe after take(1) completes the chain
  )
  .subscribe(...);

Obviously it mostly depends on what you want to do but I hope you get the point.

Upvotes: 4

Related Questions