Bick
Bick

Reputation: 18541

rxjava - How to handle merge exceptions without terminating the whole process

I have created two observables.
One of them throws an exception.

obs1 = Observable.from(new Integer[]{1, 2, 3, 4, 5, 6});

obs2 = Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override public void call(Subscriber<? super Integer> subscriber) {
        boolean b = getObj().equals(""); // this throws an exception
        System.out.println("1");
    }
});

Now I invoke them using

Observable.merge(obs2, obs1)
          .subscribe(new Observer<Integer>() {
                @Override public void onCompleted() {
                    System.out.println("onCompleted");
                }

                @Override public void onError(Throwable throwable) {
                    System.out.println("onError");
                }

                @Override public void onNext(Integer integer) {
                    System.out.println("onNext - " + integer);
                }
            });

Now, I dont want my process to halt completely when an exception occurs -
I want to handle it and I want obs1 to continue its work.

I have tried to write it using onErrorResumeNext(), onExceptionResumeNext(), doOnError() but nothing helped - obs1 did not run.

How can I handle the exception without stopping the other observable from being processed?

Upvotes: 2

Views: 1612

Answers (2)

akarnokd
akarnokd

Reputation: 70007

Sounds like you need mergeDelayError.

Upvotes: 4

dwursteisen
dwursteisen

Reputation: 11515

The problem is in your subscriber which is broken. You should catch your exception and call onError. Otherwise, you broke the rx contract.

example :

    Observable<Integer> obs1 = Observable.from(Arrays.asList(1, 2, 3, 4, 5, 6));

    Observable<Integer> obs2 = Observable.create((Subscriber<? super Integer> subscriber) -> {
        subscriber.onError(new NullPointerException());
    });

    Observable.merge(obs2.onErrorResumeNext((e) -> Observable.empty()), obs1)
            .subscribe(new Observer<Integer>() {
                @Override public void onCompleted() {
                    System.out.println("onCompleted");
                }

                @Override public void onError(Throwable throwable) {
                    System.out.println("onError");
                }

                @Override public void onNext(Integer integer) {
                    System.out.println("onNext - " + integer);
                }
            });

so if you replace your obs2 code with this, it should work like you expected :

obs2 = Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override public void call(Subscriber<? super Integer> subscriber) {
        try {

            boolean b = getObj().equals(""); // this throws an exception
            System.out.println("1");
        } catch(Exception ex) {
            subscriber.onError(ex);
        }
    }
});

Upvotes: 0

Related Questions