jigfox
jigfox

Reputation: 18185

Retry inner Observable,

I have an outer and inner Observable. The inner Obersvable has sometimes errors, which can be worked around with retries.

function sm$(val) {
  if (Math.random() > .4) {
    return Rx.Observable.throw(val)
  } else {
    return Rx.Observable.of(val)
  }
}

function sm(val) {
  return Rx.Observable.of(val)
    .switchMap(sm$)
    .catch(() => Rx.Observable.of(val).delay(1000).switchMap(sm))
}

Rx.Observable
  .interval(500)
  .switchMap(sm)
  .take(5)
  .subscribe(
    val => console.log("val:", val),
    err => console.log("err:", err),
    () => console.log("complete")
  )
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.4.0/Rx.min.js"></script>

this log i.e.:

val: 1
val: 2
val: 6
val: 10
val: 11
complete

When I return a delayed observable in the catch block, it is not working properly some values are just skipped. Without the delay it works as expected, but that's not what I want. I want the repeated exectution to be delayed.

function sm$(val) {
  if (Math.random() > .4) {
    return Rx.Observable.throw(val)
  } else {
    return Rx.Observable.of(val)
  }
}

function sm(val) {
  return Rx.Observable.of(val)
    .switchMap(sm$)
    .catch(() => Rx.Observable.of(val).switchMap(sm))
    // removed the delay(1000)
}

Rx.Observable
  .interval(500)
  .switchMap(sm)
  .take(5)
  .subscribe(
    val => console.log("val:", val),
    err => console.log("err:", err),
    () => console.log("complete")
  )
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.4.0/Rx.min.js"></script>

this log always looks like this, and this is what I expect:

val: 0
val: 1
val: 2
val: 3
val: 4
complete

Upvotes: 0

Views: 257

Answers (2)

Picci
Picci

Reputation: 17752

I think the problem is related to the fact that you use a delay of 1000 ms while at the same time you complete the stream after the emission of 5 events, each occurring every 500 ms.

This means that a delayed emission, delayed because of the catch block, can arrive after the stream has been completed.

If you reduce the delay you may get what you are looking for. I have also added some logging to have a better picture

import {Observable} from 'rxjs';

function sm$(val) {
    if (Math.random() > .4) {
      return Observable.throw(val)
    } else {
      return Observable.of(val)
    }
}

function sm(val) {
    return Observable.of(val)
      .switchMap(sm$)
      .catch(() => {
                        console.log('error', val);
                        return Observable.of(val).delay(10).switchMap(d => sm(d));
                    })
}

Observable
    .interval(500)
    .switchMap(sm)
    .take(5)
    .subscribe(
      val => console.log("val:", val),
      err => console.log("err:", err),
      () => console.log("complete")
    )

Upvotes: 0

martin
martin

Reputation: 96891

The problem is that you're using Observable.interval(500).switchMap(sm).

In particular the switchMap in your case. The source Observable.interval keeps emitting even when you use delay(1000) on the inner Observable just because of switchMap it already unsubscribed when it wants to emit after 1000ms.

So it looks like you could to use concatMap instead of the outer switchMap.

function sm$(val) {
  if (Math.random() > .4) {
    return Rx.Observable.throw(val)
  } else {
    return Rx.Observable.of(val)
  }
}

function sm(val) {
  return Rx.Observable.of(val)
    .switchMap(sm$)
    .catch(() => Rx.Observable.of(val).delay(1000).switchMap(sm))
}

Rx.Observable
  .interval(500)
  .concatMap(sm)
  .take(5)
  .subscribe(
    val => console.log("val:", val),
    err => console.log("err:", err),
    () => console.log("complete")
  )
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.4.0/Rx.min.js"></script>

Upvotes: 2

Related Questions