Cheng
Cheng

Reputation: 785

retry / repeat with delay not working in RxJava2

I am upgrading to rxjava2, we have the code to poll the data from server, the code handles to retry with delays when there is a network issue. However, somehow when i was trying to migrate to rxjava2, the code stops working. Here is the code for Rxjava1, and its working perfectly , basically followed this http://blog.danlew.net/2015/03/02/dont-break-the-chain/ and this https://medium.com/@v.danylo/server-polling-and-retrying-failed-operations-with-retrofit-and-rxjava-8bcc7e641a5a

.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
                @Override
                public Observable<?> call(final Observable<? extends Throwable> observable) {
                    // wrap a flatmap here so that i can check the exception type
                    return observable.flatMap(new Func1<Throwable, Observable<?>>() {
                        @Override
                        public Observable<?> call(Throwable throwable) {
                            mThrowable = throwable;
                            if (throwable instanceof IOException) {
                                return observable.compose(timerWithRetries());
                            } else {
                                // for other errors, call onError and exit
                                return Observable.error(throwable);
                            }
                        }
                    });
                }
            })



private <T> Observable.Transformer<T, Long> timerWithRetries() {
    return new Observable.Transformer<T, Long>() {

        @Override
        public Observable<Long> call(Observable<T> observable) {
            return observable
                    .zipWith(Observable.range(COUNTER_START, MAX_RETRIES + 1),
                            new Func2<T, Integer, Integer>() {
                                @Override
                                public Integer call(T t, Integer repeatAttempt) {
                                    return repeatAttempt;
                                }
                            })
                    .flatMap(new Func1<Integer, Observable<Long>>() {
                        @Override
                        public Observable<Long> call(Integer repeatAttempt) {
                            if (repeatAttempt == MAX_RETRIES + 1) {
                                if (mThrowable instanceof IOException) {
                                  // Custom Exception
                                   throw new Exception();
                                }
                            }
                            // increase the waiting time
                            return Observable.timer(repeatAttempt * mDelaySeconds, TimeUnit.SECONDS);
                        }
                    });
        }
    };
}

I want to wrap the error with flatmap so that i can check the exception type and when it reaches to the maximum retries, i can pass my custom exception to onError.

However, when just use Rxjava2, the timerWithRetries() method stops working, this method is called but the .zipWith() and its flatmap are not executed.

But its working without the flatmap when wrapping the error, which is very weird. Something like

.retryWhen(error -> error.compose(timerWithRetries()))

Much appreciated with any suggestions!

Upvotes: 0

Views: 2819

Answers (2)

Cheng
Cheng

Reputation: 785

I finally managed to find a workaround solution. Use a delay() instead of using zipWith() and flatmap().

 AtomicInteger retryCounter = new AtomicInteger(0);

.retryWhen(error -> error.flatmap(e -> {
     if (e instanceof HttpException) {
         // code that deals with specific exception
        int retries = retryCounter.increaseAndGet();
        if (retries < MAX_RETRIES) {
            // Key point here, uses .delay()
            return Observable.just(new Object()).delay(delaySeconds, SECOND);
        }
     }
 }))

Upvotes: 0

akarnokd
akarnokd

Reputation: 69997

1.x retryWhen used a BehaviorSubject that hold onto the last Throwable and was replayed when there was a new subscriber. This was mainly due to its "weird" implementation by trying to support most retry and repeat operators.

2.x uses PublishSubject and is generally subscribed to exactly once (not composed over again). Only observers at the time of the failure will receive the error value but not anything that comes right after the error emission.

Actually, observable.compose(timerWithRetries()); is not fully correct because you keep adding observers to the subject without cleaning the previous ones up.

The last case works because you build on the primary error source with a counted-flatMapped handler that emits as a response to the original error.

Upvotes: 2

Related Questions