Motassem Kassab
Motassem Kassab

Reputation: 1815

How to use retryWhen with a function that returns a Boolean?

Here's my code:

this._http.post(this._url_get + extension, '', { headers: headers })
    .map(res => res['_body'])
    .retryWhen(errors => {return responseErrorProcess(errors)})

now I need to catch exceptions and pass them to my responseErrorProcess() which returns true if it needs to retry

I could not figure out how to retrieve the exceptions from errors, this is how it looks:

Subject_isScalar: falseclosed: falsehasError: falseisStopped: falseobservers: Array[0]thrownError: null__proto__: Observable`

It doesn't seem to contain errors about the exceptions that occurs, plus I couldn't figure out what should I return in order to actually retry or not.

Upvotes: 3

Views: 8134

Answers (2)

BeetleJuice
BeetleJuice

Reputation: 40896

retryWhen should return an Observable. The retry occurs once that observable emits:

.retryWhen(errors => 
    //switchMap to retrieve the source error
    errors.switchMap(sourceErr => 
        //send source to processor
        responseErrorsProcess(sourceErr) ? 
        //if result is TRUE, emit (will cause retry). Else, pass on the error
        Observable.of(true): Observable.throw(sourceErr)
    )
)

If you want to complete instead of error when your processor returns false, replace Observable.throw() with Observable.empty()

Upvotes: 8

martin
martin

Reputation: 96889

The callable to retryWhen() needs to return an Observable that emits complete or error to end the stream or emits a value to resubscribe.

For example this code completes without emitting the error because of Observable.empty():

Observable.create(obs => {
    obs.next(1);
    obs.next(2);
    obs.error('error from source');
  })
  .retryWhen((errors) => {
      errors.subscribe(sourceError => console.log(sourceError));
      return Observable.create(obs => obs.error('inner error'));
  })
  .subscribe(
    val => console.log(val),
    err => console.log('error', err),
    _ => console.log('complete')
  );

The error from source Observable is emitted to errors as next. See source code: https://github.com/ReactiveX/rxjs/blob/master/src/operator/retryWhen.ts#L86

This prints to console:

1
2
error inner error
error from source

See live demo: http://plnkr.co/edit/Fajsb54WJwB8J8hkUC6j?p=preview

Edit based on comments bellow:

Look at the documentation for retryWhen():

An error will cause the emission of the Throwable that cause the error to the Observable returned from notificationHandler. If that Observable calls onComplete or error then retry will call complete or error on the child subscription. Otherwise, this Observable will resubscribe to the source observable, on a particular Scheduler.

So the Observable returned from the callback is responsible to decide whether to resubscribe. If it emits next(), then resubscribe. If it emits error() or complete() pass those to the child Observer.

For example you could do (I didn't test this code):

return response.retryWhen((errors) => {
    var retrySource = new Subject();
    errors.subscribe(error => {
        if (this.responseErrorProcess(error)) retrySource.next();
        else retrySource.complete();
    });
    return retrySource;
});

According to your inner logic you trigger the correct message on retrySource.

Upvotes: 6

Related Questions