Reputation: 93
I want to trigger retrywhen() with increasing time interval,
socketResponse.retryWhen(attempts => {
return attempts.zip(Observable.range(1, 4)).mergeMap(([error, i]) => {
console.log(`[socket] Wait ${i} seconds, then retry!`);
if (i === 4) {
console.log(`[socket] maxReconnectAttempts ${i} reached!`);
}
return Observable.timer(i * 1000);
});
});
above code works fine. current implementation output :
on connection error (1st time)
on connection successful
// successful connection.
on connection error (2nd time)
Now I want to reset waiting time when socket connection is successful.
desired output :
on connection error (1st time)
on connection successful
// successful connection.
on connection error (2nd time)
[socket] Wait 1 seconds, then retry! // wait for 1 second
[socket] Wait 2 seconds, then retry! // wait for 2 seconds
but I don't know how to reset retrywhen() time interval.
Upvotes: 7
Views: 2139
Reputation: 81
Use delayWhen
to create a notifier function that emits on increasing delays after each error occurrence. In below snippet, I'm retrying on increasing multiples of 0.5 sec. However, you can use a different time series, if desired. Here, I also show how to limit the number of retries and throw an error in that case.
socketResponse$
.pipe(
retryWhen(errors =>
errors.pipe(
// delay(500), // fix delay: 0.5 sec
delayWhen((_,i) => timer(i*500)), // increasing delay: 0.5 sec multiple
tap(_ => console.log('retrying...')),
take(2), // max. retries: 2 x
concat(throwError('number of retries exceeded'))
)
)
)
.subscribe(
x => console.log('value:', x),
e => console.error('error:', e)
);
Upvotes: 0
Reputation: 1148
I also had the same problem. My goal was to wrap a socket connection with observables. I wanted the network observable to never complete (unless specifically asked to) and infinitely retry connecting in case of any error.
Here's how I accomplished this (with rxjs version 6+). This my retryWhen
operator, retrying infinitely while adding a scaling wait duration scalingDuration
topped at maxDuration
.
import { Observable, timer } from 'rxjs';
import { mergeMap } from 'rxjs/operators';
export const infiniteRetryStrategy = (opts: { scalingDuration: number, maxDuration: number }, attempt?: () => number) => (attempts: Observable<{}>) => {
return attempts.pipe(
mergeMap((error, i) => {
// use attempt if we have it, otherwise use the number of errors received
const retryAttempt = attempt ? attempt() : i + 1;
const waitDuration = Math.min(opts.maxDuration, retryAttempt * opts.scalingDuration);
console.error(`Got error: ${error}. Retrying attempt ${retryAttempt} in ${waitDuration}ms`);
return timer(waitDuration);
})
);
};
Usage is pretty straight forward. If you provide the attempt
function, you can manage the retry number externally and you can therefore reset it on a next tick.
import { retryWhen, tap } from 'rxjs/operators';
class Connection {
private reconnectionAttempt = 0;
public connect() {
// replace myConnectionObservable$ by your own observale
myConnectionObservable$.pipe(
retryWhen(
infiniteRetryStrategy({
scalingDuration: 1000,
maxDuration: 10000
}, () => ++this.reconnectionAttempt)
),
tap(() => this.reconnectionAttempt = 0)
).subscribe(
(cmd) => console.log(cmd),
(err) => console.error(err)
);
}
}
This isn't perfect as you need to keep state outside of the stream but it's the only way I managed to do it. Any cleaner solution is welcomed.
Upvotes: 1
Reputation: 1
I recently solved this by using a helper class for the retryWhen handler. This was in RxJava rather than RxJS, but I assume the concept is the same. In my case I wanted to error out if I exceeded my maximum retries, so you may need to tweak that part to do what you need.
private static class RetryWhenHandler implements Function<Observable<? extends Throwable>, Observable<?>> {
int retryCount = 0;
@Override
public Observable<?> apply(Observable<? extends Throwable> attempts) throws Exception {
if (++retryCount <= MAX_RETRIES) {
return Observable.timer(i, TimeUnit.Seconds);
} else {
return Observable.error(throwable);
}
}
}
Then, in your example above:
RetryWhenHandler myHandler = new RetryWhenHandler();
socketResponse
.doOnNext(ignored -> myHandler.retryCount = 0)
.retryWhen(myHandler);
This ought to reset your retryCount any time you get an update from your observable so that it's 0 next time you hit an error.
Upvotes: 0