Reputation: 25757
Background
I have a process that uses RxJava to get data from different locations based on a list. Each item is got through a different method (all returning Observables.). Due to having N items to get the logic operator to use is zip
with an iterator.
The Problem
The code below works as expected but it seems "wrong" that I need a try-catch
block to catch the Exception that is thrown by getBigFoo()
- that returns a FooNotFoundException
. Do not the other error related operators cover this, such as onErrorResumeNext()
and onErrorReturn()
?
private Observable<Bar> processFoos(List<Foo> foos) {
List<Observable<? extends IBar>> observablesToZip = new ArrayList<>();
for(Foo foo : foos) {
switch (foo.getType()) {
case BIG_FOO :
try {
observablesToZip.add(getBigFoo(foo.getId()));
} catch (Exception exception) {
//do nothing - but this seems wrong
}
}
}
return Observable.zip(observablesToZip, results -> mergeFoosIntoBar(results));
}
Attempts Made
The attempt below doesn't seem to catch the Exception generated. I don't understand why as there are technically no upstream or downstream items in the sequence, so Observable.empty()
should work?
private Observable<Bar> processFoos(List<Foo> foos) {
List<Observable<? extends IBar>> observablesToZip = new ArrayList<>();
for(Foo foo : foos) {
switch (foo.getType()) {
case BIG_FOO :
observablesToZip.add(getBigFoo(foo.getId().onErrorResumeNext(Observable.empty()));
}
}
return Observable.zip(observablesToZip, results -> mergeFoosIntoBar(results));
}
Upvotes: 2
Views: 1353
Reputation: 25757
@dwursteisen was on to the right answer, but wasn't quite there.
My issue was that I was throwing a new FooNotFoundException:
throw new FooNotFoundException()
But what I needed to do was:
return Observable.error(new FooNotFoundException());
Then in my Zip function:
observablesToZip.add(getBigFoo(foo.getId())).onExceptionResumeNext(Observable.just(null);
Using the above combination means that the overall sequence does not abort and return an error when the individual Observables are resolved and potentially throw errors.
Upvotes: 1
Reputation: 13471
Could you make getBigFoo(foo.getId()) throw RuntimeException instead Exception?. All Exceptions on Pipeline must be captured, but not runtimeExceptions.
Take a look to this silly example
/**
* Here is a silly example how runtimeExceptions are not needed
*/
@Test
public void observableRuntimeException() {
Integer[] numbers = {0, 1, 2, 3, 4, 5};
Observable.from(numbers)
.doOnNext(number -> throwRuntimeException())
.doOnError(t -> System.out.println("Expecting illegal argument exception:" + t.getMessage()))
.subscribe();
}
private void throwRuntimeException() {
throw new RuntimeException();
}
you can see more examples here https://github.com/politrons/reactive
Upvotes: 0
Reputation: 11515
You may want to use defer
. getBigFoo
should not throw an exception but instead return an Observable
in error. So defer
may help you to fix it :
Observable<IBar> obs = Observable.defer(() -> {
try {
return getBigFoo(foo.getId());
} catch (Exception ex) {
return Observable.error(ex);
}
});
observablestoZip.add(obs);
Upvotes: 1