durgesh rao
durgesh rao

Reputation: 93

RXJS retryWhen reset waiting interval

I want to trigger retrywhen() with increasing time interval,

   socketResponse.retryWhen(attempts => {
    return attempts.zip(Observable.range(1, 4)).mergeMap(([error, i]) => {
        console.log(`[socket] Wait ${i} seconds, then retry!`);
        if (i === 4) {
            console.log(`[socket] maxReconnectAttempts ${i} reached!`);
        }
        return Observable.timer(i * 1000);
    });
});

above code works fine. current implementation output :

on connection error (1st time)


on connection successful


// successful connection.

on connection error (2nd time)


Now I want to reset waiting time when socket connection is successful.

desired output :

on connection error (1st time)


on connection successful


// successful connection.

on connection error (2nd time)


but I don't know how to reset retrywhen() time interval.

Upvotes: 7

Views: 2139

Answers (3)

José Carballosa
José Carballosa

Reputation: 81

Use delayWhen to create a notifier function that emits on increasing delays after each error occurrence. In below snippet, I'm retrying on increasing multiples of 0.5 sec. However, you can use a different time series, if desired. Here, I also show how to limit the number of retries and throw an error in that case.

socketResponse$
  .pipe(
    retryWhen(errors =>
      errors.pipe(
        // delay(500),                       // fix delay: 0.5 sec
        delayWhen((_,i) => timer(i*500)),    // increasing delay: 0.5 sec multiple
        tap(_ => console.log('retrying...')),
        take(2),                             // max. retries: 2 x
        concat(throwError('number of retries exceeded'))
      )
    )
  )
  .subscribe(
    x => console.log('value:', x),
    e => console.error('error:', e)
  );

Upvotes: 0

Starscream
Starscream

Reputation: 1148

I also had the same problem. My goal was to wrap a socket connection with observables. I wanted the network observable to never complete (unless specifically asked to) and infinitely retry connecting in case of any error.

Here's how I accomplished this (with rxjs version 6+). This my retryWhen operator, retrying infinitely while adding a scaling wait duration scalingDuration topped at maxDuration.

import { Observable, timer } from 'rxjs';
import { mergeMap } from 'rxjs/operators';

export const infiniteRetryStrategy = (opts: { scalingDuration: number, maxDuration: number }, attempt?: () => number) => (attempts: Observable<{}>) => {
    return attempts.pipe(
        mergeMap((error, i) => {
            // use attempt if we have it, otherwise use the number of errors received
            const retryAttempt = attempt ? attempt() : i + 1;
            const waitDuration = Math.min(opts.maxDuration, retryAttempt * opts.scalingDuration);

            console.error(`Got error: ${error}. Retrying attempt ${retryAttempt} in ${waitDuration}ms`);
            return timer(waitDuration);
        })
    );
};

Usage is pretty straight forward. If you provide the attempt function, you can manage the retry number externally and you can therefore reset it on a next tick.

import { retryWhen, tap } from 'rxjs/operators';

class Connection {
    private reconnectionAttempt = 0;

    public connect() {
        // replace myConnectionObservable$ by your own observale
        myConnectionObservable$.pipe(
            retryWhen(
                infiniteRetryStrategy({
                    scalingDuration: 1000,
                    maxDuration: 10000
                }, () => ++this.reconnectionAttempt)
            ),
            tap(() => this.reconnectionAttempt = 0)
        ).subscribe(
            (cmd) => console.log(cmd),
            (err) => console.error(err)
        );
    }
}

This isn't perfect as you need to keep state outside of the stream but it's the only way I managed to do it. Any cleaner solution is welcomed.

Upvotes: 1

Shane L
Shane L

Reputation: 1

I recently solved this by using a helper class for the retryWhen handler. This was in RxJava rather than RxJS, but I assume the concept is the same. In my case I wanted to error out if I exceeded my maximum retries, so you may need to tweak that part to do what you need.

private static class RetryWhenHandler implements Function<Observable<? extends Throwable>, Observable<?>> {
    int retryCount = 0;

    @Override
    public Observable<?> apply(Observable<? extends Throwable> attempts) throws Exception {
        if (++retryCount <= MAX_RETRIES) {
            return Observable.timer(i, TimeUnit.Seconds);
        } else {
            return Observable.error(throwable);
        }
    }
}

Then, in your example above:

RetryWhenHandler myHandler = new RetryWhenHandler();

socketResponse
    .doOnNext(ignored -> myHandler.retryCount = 0)
    .retryWhen(myHandler);

This ought to reset your retryCount any time you get an update from your observable so that it's 0 next time you hit an error.

Upvotes: 0

Related Questions