Graham Smith
Graham Smith

Reputation: 25757

RxJava Zip Operator Error Handling with Iterator

Background

I have a process that uses RxJava to get data from different locations based on a list. Each item is got through a different method (all returning Observables.). Due to having N items to get the logic operator to use is zip with an iterator.

The Problem

The code below works as expected but it seems "wrong" that I need a try-catch block to catch the Exception that is thrown by getBigFoo() - that returns a FooNotFoundException. Do not the other error related operators cover this, such as onErrorResumeNext() and onErrorReturn()?

private Observable<Bar> processFoos(List<Foo> foos) {

        List<Observable<? extends IBar>> observablesToZip = new ArrayList<>();

        for(Foo foo : foos) {

            switch (foo.getType()) {

                case BIG_FOO :

                    try {
                        observablesToZip.add(getBigFoo(foo.getId()));
                    } catch (Exception exception) {
                        //do nothing - but this seems wrong
                    }
            }
        }

        return Observable.zip(observablesToZip, results -> mergeFoosIntoBar(results));
    }

Attempts Made

The attempt below doesn't seem to catch the Exception generated. I don't understand why as there are technically no upstream or downstream items in the sequence, so Observable.empty() should work?

private Observable<Bar> processFoos(List<Foo> foos) {

        List<Observable<? extends IBar>> observablesToZip = new ArrayList<>();

        for(Foo foo : foos) {

            switch (foo.getType()) {

                case BIG_FOO :
                   observablesToZip.add(getBigFoo(foo.getId().onErrorResumeNext(Observable.empty()));
            }
        }

        return Observable.zip(observablesToZip, results -> mergeFoosIntoBar(results));
    }

Upvotes: 2

Views: 1353

Answers (3)

Graham Smith
Graham Smith

Reputation: 25757

@dwursteisen was on to the right answer, but wasn't quite there.

My issue was that I was throwing a new FooNotFoundException:

throw new FooNotFoundException()

But what I needed to do was:

return Observable.error(new FooNotFoundException());

Then in my Zip function:

observablesToZip.add(getBigFoo(foo.getId())).onExceptionResumeNext(Observable.just(null);

Using the above combination means that the overall sequence does not abort and return an error when the individual Observables are resolved and potentially throw errors.

Upvotes: 1

paul
paul

Reputation: 13471

Could you make getBigFoo(foo.getId()) throw RuntimeException instead Exception?. All Exceptions on Pipeline must be captured, but not runtimeExceptions.

Take a look to this silly example

         /**
 * Here is a silly example how runtimeExceptions are not needed
 */
@Test
public void observableRuntimeException() {
    Integer[] numbers = {0, 1, 2, 3, 4, 5};

    Observable.from(numbers)
              .doOnNext(number -> throwRuntimeException())
              .doOnError(t -> System.out.println("Expecting illegal argument exception:" + t.getMessage()))
              .subscribe();

}

private void throwRuntimeException() {
    throw new RuntimeException();
}

you can see more examples here https://github.com/politrons/reactive

Upvotes: 0

dwursteisen
dwursteisen

Reputation: 11515

You may want to use defer. getBigFoo should not throw an exception but instead return an Observable in error. So defer may help you to fix it :

Observable<IBar> obs = Observable.defer(() -> {
             try {
                 return getBigFoo(foo.getId());
              } catch (Exception ex) {
                  return Observable.error(ex);
              } 
});
observablestoZip.add(obs);

Upvotes: 1

Related Questions