Egor Neliuba
Egor Neliuba

Reputation: 15054

RxJava onErrorResumeNext scheduler

I have an observable which might fail with a special exception in which case I want to show a dialog with a retry button. I've seen this answer, but it doesn't quite do what I want. I wasn't able to use retryWhen to solve my problem, so instead I used onErrorResumeNext. If you can come up with a way to do the same with retryWhen, please tell.

Right now I have this piece of code:

public Observable<Order> proceedWithOrdering(Activity activity) {
    return apiService.createOrder()
            .subscribeOn(Schedulers.io())
            .compose(applyRetryLogic(activity))
            .subscribeOn(AndroidSchedulers.mainThread())
            .observeOn(AndroidSchedulers.mainThread());
}

public <T extends ApiResponse> Observable.Transformer<T, T> applyRetryLogic(Activity activity) {
    return observable -> observable
            .onErrorResumeNext(retry(observable, activity))
            .subscribeOn(AndroidSchedulers.mainThread());
}

public <T> Func1<Throwable, ? extends Observable<? extends T>> retry(Observable toRetry, Activity activity) {
    return throwable -> {
        if (throwable instanceof NetworkException) {
            MaterialDialog dialog = retryDialog(activity);
            View retry = dialog.getActionButton(DialogAction.POSITIVE);
            View cancel = dialog.getActionButton(DialogAction.NEGATIVE);
            Observable<Object> retryClick = RxView.clicks(retry).map(o -> {
                dialog.dismiss();
                return o;
            });
            Observable<Object> cancelClick = RxView.clicks(cancel).flatMap(o -> {
                dialog.dismiss();
                return Observable.error(throwable);
            });

            dialog.show();

            return Observable.amb(retryClick, cancelClick)
                    .flatMap(o -> toRetry.compose(applyRetryLogic(activity)));
        } else {
            return Observable.error(throwable);
        }
    };
}

The problem is that the call inside the retry gets executed not on the main thread and it raises the Can't create handler inside thread that has not called Looper.prepare() exception.

The question is - how do I force it to be executed on the main thread? As you can see, I have already tried doing subscribeOn(AndroidSchedulers.mainThread()) right after both compose and onErrorResumeNext with no luck.

I have tested my code using simple observables that don't operate on separate threads and it works fine.

Upvotes: 4

Views: 1683

Answers (2)

akarnokd
akarnokd

Reputation: 69997

You can accomplish this by flatMapping a PublishSubject which is then updated once the relevant button is pressed. Here is a classical Java Swing example.

public class RetryWhenEnter {
    public static void main(String[] args) {
        AtomicInteger d = new AtomicInteger();
        Observable<Integer> source = Observable.just(1);

        source.flatMap(v -> {
            if (d.incrementAndGet() < 3) {
                return Observable.error(new RuntimeException());
            }
            return Observable.just(v);
        })
        .retryWhen(err -> {
            return err.flatMap(e -> {
                System.out.println(Thread.currentThread() + " Error!");
                PublishSubject<Integer> choice = PublishSubject.create();
                SwingUtilities.invokeLater(() -> {
                    int c = JOptionPane.showConfirmDialog(null, 
                        e.toString() + "\r\nRetry?", "Error",
                        JOptionPane.YES_NO_OPTION);
                    if (c == JOptionPane.YES_OPTION) {
                        choice.onNext(1);
                    } else {
                        choice.onCompleted();
                    }
                });
                return choice;
            });
        }).subscribe(System.out::println, 
                Throwable::printStackTrace);
    }
}

Edit:

Or use observeOn(AndroidSchedulers.mainThread()) just before onErrorResumeNext or when using retryWhen: retryWhen(o -> o.observeOn(AndroidSchedulers.mainThread())...).

Edit 2 I've rolled back the change so the answer is meaningful again.

Upvotes: 3

Egor Neliuba
Egor Neliuba

Reputation: 15054

There is a way of solving my problem using retryWhen (thanks @akarnokd):

public <T extends ApiResponse> Observable.Transformer<T, T> applyRetryLogic(Activity activity) {
    return observable -> observable
            .retryWhen(err -> err.flatMap(throwable -> {
                L.d(Thread.currentThread() + " Error!");
                if (throwable instanceof NetworkException) {
                    PublishSubject<Integer> choice = PublishSubject.create();
                    activity.runOnUiThread(() -> {
                        MaterialDialog dialog = retryDialog(activity);
                        View retry = dialog.getActionButton(DialogAction.POSITIVE);
                        View cancel = dialog.getActionButton(DialogAction.NEGATIVE);
                        RxView.clicks(retry).subscribe(o -> {
                            dialog.dismiss();
                            choice.onNext(1);
                        });
                        RxView.clicks(cancel).subscribe(o -> {
                            dialog.dismiss();
                            choice.onError(throwable);
                        });

                        dialog.show();
                    });
                    return choice;
                } else {
                    return Observable.error(throwable);
                }
            }));
}

Upvotes: 2

Related Questions