qed
qed

Reputation: 23154

Use onErrorReturn with retryWhen in RxJava

Here is the code:

import io.reactivex.Observable;
import io.reactivex.Observer;
import org.junit.jupiter.api.Test;

import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;

public class RxJavaTest {

    @Test
    public void onErr() {

        Observable<String> values1 = new Observable<String>() {
            @Override
            protected void subscribeActual(Observer<? super String> observer) {
                observer.onNext("New");
                observer.onNext("New1");
                observer.onNext("New2");
                observer.onNext("New3");
                observer.onNext("New4");
                if (ThreadLocalRandom
                            .current()
                            .nextInt(10) == 5) {
                    observer.onError(new Exception("don't retry..."));
                } else {
                    observer.onError(new IllegalArgumentException("retry..."));
                }
            }
        };
        AtomicBoolean finished = new AtomicBoolean(false);
        values1
                .retryWhen(throwableObservable -> throwableObservable
                        .takeWhile(throwable -> {
                            boolean result = (throwable instanceof IllegalArgumentException);
                            if (result) {
                                System.out.println("Retry on error: " + throwable);
                                return result;
                            }
                            System.out.println("Error: " + throwable);
                            return result;
                        })
                        .take(20))
                .onErrorReturn(throwable -> "Saved the day!")
                .doOnTerminate(() -> finished.set(true))
                .subscribe(v -> System.out.println(v));
    }
}

The goal is to

The code above accomplishes the first goal, but failed at the second, it stops retrying, but ignores the .onErrorReturn part.

Any idea how to make it work?

Upvotes: 1

Views: 971

Answers (2)

qed
qed

Reputation: 23154

For the record, here is my solution before I saw Gustavo's answer using onErrorResumeNext:

    private Observable<String> createObservable(long delay) {
        Observable<String> values1 = new Observable<String>() {
            @Override
            protected void subscribeActual(Observer<? super String> observer) {
                observer.onNext("New");
                observer.onNext("New1");
                observer.onNext("New2");
                observer.onNext("New3");
                observer.onNext("New4");
                if (ThreadLocalRandom
                        .current()
                        .nextInt(8) == 2) {
                    observer.onError(new RuntimeException("don't retry..."));
                } else {
                    observer.onError(new IllegalArgumentException("retry..."));
                }
            }
        };
        return Observable.timer(delay, TimeUnit.SECONDS).flatMap(aLong -> values1)
                .onErrorResumeNext((Throwable throwable) -> {
                    if (throwable instanceof IllegalArgumentException) {
                        return createObservable(delay + 2);
                    } else {
                        return Observable.just("The default value");
                    }
                });
    }

This works as expected, but I think the way Gustavo suggested is easier to understand. Here is the same function rewritten using retryWhen:

    private Observable<String> createObservable1() {
        Observable<String> values1 = new Observable<String>() {
            @Override
            protected void subscribeActual(Observer<? super String> observer) {
                observer.onNext("New");
                observer.onNext("New1");
                observer.onNext("New2");
                observer.onNext("New3");
                observer.onNext("New4");
                if (ThreadLocalRandom
                        .current()
                        .nextInt(3) == 1) {
                    observer.onError(new RuntimeException("don't retry..."));
                } else {
                    observer.onError(new IllegalArgumentException("retry..."));
                }
            }
        };
        return values1.retryWhen(throwableObservable ->
                throwableObservable
                        .zipWith(Observable.range(1, 5), (throwable, integer) -> {
                            if (throwable instanceof IllegalArgumentException) {
                                System.out.println("Retry on error: " + throwable);
                                return integer;
                            }
                            System.out.println("No retry on error: " + throwable);
                            return -1;
                        })
                        .flatMap(integer -> {
                            if (integer > 0) {
                                System.out.println("Delay " + integer + " sec on retry...");
                                return Observable.timer(integer, TimeUnit.SECONDS);
                            }
                            System.out.println("Return immediately...");
                            return Observable.error(new Exception());
                        })
        ).onErrorReturnItem("Saved the day!");
    }

Hope this helps.

Upvotes: 0

Gustavo Pagani
Gustavo Pagani

Reputation: 6998

You can make it work changing your retryWhen to:

                .retryWhen(throwableObservable ->
                                throwableObservable.flatMap(throwable -> {
                                    if (throwable instanceof IllegalArgumentException) {
                                        System.out.println("Retry on error: " + throwable);
                                        return Observable.just(1);
                                    } else {
                                        System.out.println("Error: " + throwable);
                                        return Observable.<Integer>error(throwable);
                                    }
                                })
                )

In order to make it retry, it doesn't matter which value you return in the retryWhen (in the example above it's returning 1). As per javadoc:

If that ObservableSource calls onComplete or onError then retry will call onComplete or onError on the child subscription. Otherwise, this ObservableSource will resubscribe to the source ObservableSource.

Upvotes: 1

Related Questions