Brandon
Brandon

Reputation: 1004

How do I adjust timeout duration on retry using RxJS?

I'm working with the latest Angular and Typescript and RxJS 5.

Angular has currently made RxJS a necessity. I've used C# primarily for over 10 years and I'm very much used to Linq/Lambdas/fluent syntax which I assume formed the basis of Reactive.

I would like to make an Http get call with an increasing timeout value on retry, but I'm having a problem seeing how to do that and still keeping everything in the pipeline (not using external state).

I get that I can do this, but it will just retry using the same timeout value.

myHttpObservable.timeout(1000).retry(2);

The documentation for RxJS has been poor in many places and asking about it on here only got my question deleted out of existence, which is sad...so I was forced to look through the source.

Is there a way to retry with an increasing timeout duration each time in a way that keeps state in the pipeline? Also, I want an innitial timeout on the first attempt.

I've tried things similar to this at first, but realized the confusing retryWhen operator is not really intended for what I want:

myHttpObservable.timeout(1000).retryWhen((theSubject: Observable<Error>) => {
 return  aNewMyObservableCreatedinHere.timeout(2000);  
});

I know I could accomplish this using external state, but I'm basically looking for an elegant solution that, I think, is what they are kind of driving for with the reactive style of programming.

Upvotes: 11

Views: 9980

Answers (5)

yesterday_time
yesterday_time

Reputation: 23

I think the backoff isn't correct. This delay is the error to the next request, not the request to the next request, the timeout in the rxjs say that the source spends time is larger than the timeout, I want to change the timeout of the request, not add the delay between the error and next request, who know how to do it?

Like that:
Request: 8s, timeout: 3s, timeout change: 3 * 2 * retryIndex
expect: This request will retry twice and spend 8s finally.
Backoff: don't use the timeout, this request will succeed at the first time. If the request has another error, the next request will send after 3 * 2 * retryIndex.

Upvotes: 0

Jos&#233; Carballosa
Jos&#233; Carballosa

Reputation: 81

I use delayWhen to create a notifier function that makes retryWhen emit on increasing delays after each error occurrence. You can choose different time series by changing the formula used to calculate the delay between retries. See the two example formulas below for the geometric and exponential backoff retry strategies. Note also how I limit the number of retries and throw an error if I reach that limit.

const initialDelay = 1000;
const maxRetries = 4;
throwError('oops')
  .pipe(
    retryWhen(errors =>
      errors.pipe(
        delayWhen((_,i) => {
          // const delay = (i+1)*initialDelay;            // geometric
          const delay = Math.pow(2,i)*initialDelay;       // exponential
          console.log(`retrying after ${delay} msec...`);
          return timer(delay);
        }),
        take(maxRetries),
        concat(throwError('number of retries exceeded')))))
  .subscribe(
    x => console.log('value:', x),
    e => console.error('error:', e)
  );

Upvotes: 4

Alex Okrushko
Alex Okrushko

Reputation: 7382

There is an operator in backoff-rxjs npm package to deal with this case called retryBackoff. I described it in the article at blog.angularindepth.com, but in a nutshell it's this:

source.pipe(
  retryWhen(errors => errors.pipe(
      concatMap((error, iteration) => 
          timer(Math.pow(2, iteration) * initialInterval, maxInterval))));

Here are the sources for the more customizable version.

Upvotes: 7

Mark van Straten
Mark van Straten

Reputation: 9425

Based on additional input in the comments of my previous answer I have been experimenting with the .expand() operator to solve your requirement:

I want to make a get with timeout duration = X and if it times out, then retry with timeout = X+1

The following snippet starts with timeout = 500 and for each attempt the timeout is increased with attempt * 500 until maxAttempts has been reached or the result has been successfully retrieved:

getWithExpandingTimeout('http://reaches-max-attempts', 3)
  .subscribe(
    res => console.log(res),
    err => console.log('ERROR: ' + err.message)
  );

getWithExpandingTimeout('http://eventually-finishes-within-timeout', 10)
  .subscribe(
    res => console.log(res),
    err => console.log('ERROR: ' + err.message)
  );



/*
retrieve the given url and keep trying with an expanding 
timeout until it succeeds or maxAttempts has been reached
*/
function getWithExpandingTimeout(url, maxAttempts) {
  return get(url, 1)
    .expand(res => {
      if(res.error) {
        const nextAttempt = 1 + res.attempt;
        if(nextAttempt > maxAttempts) {
          // too many retry attempts done
          return Rx.Observable.throw(new Error(`Max attempts reached to retrieve url ${url}: ${res.error.message}`));
        }
        
        // retry next attempt
        return get(url, nextAttempt);
      }
      return Rx.Observable.empty(); // done with retrying, result has been emitted
    })
    .filter(res => !res.error);
}

/*
 retrieve info from server with timeout based on attempt
 NOTE: does not errors the stream so expand() keeps working
*/
function get(url, attempt) {
  return Rx.Observable.of(`result for ${url} returned after #${attempt} attempts`)
    .do(() => console.log(`starting attempt #${attempt} to retrieve ${url}`))
    .delay(5 * 500)
    .timeout(attempt * 500)
    .catch(error => Rx.Observable.of({ attempt: attempt, error }));
}
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.1.0/Rx.js"></script>

How does this work

Every value that is produced upstream or by the .expand() operator is emitted downstream AND used as input for the .expand() operator. This continues until no further values are emitted. By leveraging this behaviour the .get() function gets re-attempted with an increasing attempt when the emission contains an .error value inside.

The .get() function does not throw an Error because otherwise we need to catch it in the .expand() or the recursion will break unexpected.

When the maxAttempts is exceeded the .expand() operator throws an Error stopping the recursion. When no .error property is present in the emission then we expect this to be a succes result and emit an empty Observable, stopping the recursion.

NOTE: It uses .filter() to remove all emissions based on the .error property because all values produced by the .expand() are also emitted downstream but these .error values are internal state.

Upvotes: 6

Mark van Straten
Mark van Straten

Reputation: 9425

One of the biggest issues with RxJs5 at the moment is the documentation. It is really fragmented and not up to par with the previous version yet. By looking at the documentation of RxJs4 you can see that .retryWhen() already has an example for building an exponential backoff available which can be easily migrated towards RxJs5:

Rx.Observable.throw(new Error('splut'))
  .retryWhen(attempts => Rx.Observable.range(1, 3)
    .zip(attempts, i => i)
    .mergeMap(i => {
      console.log("delay retry by " + i + " second(s)");
      return Rx.Observable.timer(i * 1000);
    })
  ).subscribe();

Upvotes: 12

Related Questions