Alexandr
Alexandr

Reputation: 1901

RxJava error handling

I have a method that creates and merges two Observable<T>'s.

private Observable<String> getData() {

    Observable<String> observable1 = Observable.just("Just string")
            .delay(2, TimeUnit.SECONDS);

    Observable<String> observable2 = Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            subscriber.onError(new Exception("Oops!"));
        }
    });

    Observable<String> merged = observable1.mergeWith(observable2);

    return merged;        
}

The problem is that the error in the observable2 emitted earlier than the data in the observable1.

getData().subscribe(new Observer<String>() {
            @Override
            public void onCompleted() {
                Log.d("rx", "onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                Log.d("rx", "onError: " + e.getMessage());
            }

            @Override
            public void onNext(String s) {
                Log.d("rx", "onNext: " + s);
            }
        });

observable2 emitted optional data and errors from observable2must not terminate emission, but on the other hand, I want to handle all errors and data in a single observer. Can I return this error after the result from the observable1? Is that possible?

Expect results:

onNext: Just string
onError: Oops!

Upvotes: 2

Views: 255

Answers (1)

AndroidEx
AndroidEx

Reputation: 15824

Sounds like you need something like Observable.mergeDelayError().

It applies a merge operation but if some of the Observables call onError, it doesn't stop right away but captures the error(s).

Once all other Observables have gracefully completed, the error(s) is(are) propagated, potentially combined into a single CompositeException.

Upvotes: 3

Related Questions