Reputation: 785
I am upgrading to rxjava2, we have the code to poll the data from server, the code handles to retry with delays when there is a network issue. However, somehow when i was trying to migrate to rxjava2, the code stops working. Here is the code for Rxjava1, and its working perfectly , basically followed this http://blog.danlew.net/2015/03/02/dont-break-the-chain/ and this https://medium.com/@v.danylo/server-polling-and-retrying-failed-operations-with-retrofit-and-rxjava-8bcc7e641a5a
.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
@Override
public Observable<?> call(final Observable<? extends Throwable> observable) {
// wrap a flatmap here so that i can check the exception type
return observable.flatMap(new Func1<Throwable, Observable<?>>() {
@Override
public Observable<?> call(Throwable throwable) {
mThrowable = throwable;
if (throwable instanceof IOException) {
return observable.compose(timerWithRetries());
} else {
// for other errors, call onError and exit
return Observable.error(throwable);
}
}
});
}
})
private <T> Observable.Transformer<T, Long> timerWithRetries() {
return new Observable.Transformer<T, Long>() {
@Override
public Observable<Long> call(Observable<T> observable) {
return observable
.zipWith(Observable.range(COUNTER_START, MAX_RETRIES + 1),
new Func2<T, Integer, Integer>() {
@Override
public Integer call(T t, Integer repeatAttempt) {
return repeatAttempt;
}
})
.flatMap(new Func1<Integer, Observable<Long>>() {
@Override
public Observable<Long> call(Integer repeatAttempt) {
if (repeatAttempt == MAX_RETRIES + 1) {
if (mThrowable instanceof IOException) {
// Custom Exception
throw new Exception();
}
}
// increase the waiting time
return Observable.timer(repeatAttempt * mDelaySeconds, TimeUnit.SECONDS);
}
});
}
};
}
I want to wrap the error with flatmap so that i can check the exception type and when it reaches to the maximum retries, i can pass my custom exception to onError.
However, when just use Rxjava2, the timerWithRetries() method stops working, this method is called but the .zipWith() and its flatmap are not executed.
But its working without the flatmap when wrapping the error, which is very weird. Something like
.retryWhen(error -> error.compose(timerWithRetries()))
Much appreciated with any suggestions!
Upvotes: 0
Views: 2819
Reputation: 785
I finally managed to find a workaround solution. Use a delay() instead of using zipWith() and flatmap().
AtomicInteger retryCounter = new AtomicInteger(0);
.retryWhen(error -> error.flatmap(e -> {
if (e instanceof HttpException) {
// code that deals with specific exception
int retries = retryCounter.increaseAndGet();
if (retries < MAX_RETRIES) {
// Key point here, uses .delay()
return Observable.just(new Object()).delay(delaySeconds, SECOND);
}
}
}))
Upvotes: 0
Reputation: 69997
1.x retryWhen
used a BehaviorSubject
that hold onto the last Throwable and was replayed when there was a new subscriber. This was mainly due to its "weird" implementation by trying to support most retry
and repeat
operators.
2.x uses PublishSubject
and is generally subscribed to exactly once (not composed over again). Only observers at the time of the failure will receive the error value but not anything that comes right after the error emission.
Actually, observable.compose(timerWithRetries());
is not fully correct because you keep adding observers to the subject without cleaning the previous ones up.
The last case works because you build on the primary error source with a counted-flatMapped handler that emits as a response to the original error.
Upvotes: 2