x-ray
x-ray

Reputation: 3329

RxJava: Conditionally catch error and stop propagation

I use Retrofit with RxJava Observables and lambda expressions. I'm new to RxJava and cannot find out how to do the following:

Observable<ResponseBody> res = api.getXyz();
res.subscribe(response -> {
    // I don't need the response here
}, error -> {
    // I might be able to handle an error here. If so, it shall not go to the second error handler.
});
res.subscribe(response -> {
    // This is where I want to process the response
}, error -> {
    // This error handler shall only be invoked if the first error handler was not able to handle the error.
});

I looked at the error handling operators, but I don't understand how they can help me with my usecase.

Upvotes: 3

Views: 3665

Answers (1)

david.mihola
david.mihola

Reputation: 12992

Method 1: Keep the two Subscribers but cache the Observable.

Just keep everything as it is now, but change the first line to:

Observable<ResponseBody> res = api.getXyz().cache();

The cache will make sure that the request is only sent once but that sill both Subscribers get all the same events.

This way whether and how you handle the error in the first Subscriber does not affect what the second Subscriber sees.

Method 2: Catch some errors with onErrorResumeNext but forward all others.

Add onErrorResumeNext to your Observable to produce something like this (in the "inner" object):

Observable observable = Observable.error(new IllegalStateException())
.onErrorResumeNext(new Func1<Throwable, Observable<?>>() {
    @Override
    public Observable<?> call(Throwable throwable) {
        if (throwable instanceof NumberFormatException) {
            System.out.println("NFE - handled");
            return Observable.empty();
        } else {
            System.out.println("Some other exception - panic!");
            return Observable.error(throwable);
        }
    }
});

And only subscribe once (in the "outer" object):

observable.subscribe(new Subscriber() {
    @Override
    public void onCompleted() {
        System.out.println("onCompleted");
    }

    @Override
    public void onError(Throwable e) {
        System.out.println("onError");
        e.printStackTrace();
    }

    @Override
    public void onNext(Object o) {
        System.out.println(String.format("onNext: %s", String.valueOf(o)));
    }    

});

This way, the error is only forwarded if it cannot be handled in the onErrorResumeNext - if it can, the Subscriber will only get a call to onCompleted and nothing else.

Having side effects in onErrorResumeNext makes me a bit uncomfortable, though. :-)

EDIT: Oh, and if you want to be extra strict, you could use Method 3: Wrap every case in a new object.

public abstract class ResultOrError<T> {
}

public final class Result<T> extends ResultOrError<T> {
    public final T result;

    public Result(T result) {
        this.result = result;
    }
}

public final class HandledError<T> extends ResultOrError<T> {
    public final Throwable throwable;

    public Result(Throwable throwable) {
        this.throwable = throwable;
    }
}

public final class UnhandledError<T> extends ResultOrError<T> {
    public final Throwable throwable;

    public Result(Throwable throwable) {
        this.throwable = throwable;
    }
}

And then:

  • Wrap proper results in Result (using map)
  • Wrap handle-able errors in HandledError and
  • un-handle-able errors in UnhandledError (using onErrorResumeNext with an if clause)
  • handle the HandledErrors (using doOnError)
  • have a Subscriber<ResultOrError<ResponseBody>> - it will get notifications (onNext) for all three types but will just ignore the HandledErrors and handle the other two types.

Upvotes: 1

Related Questions