Reputation: 11796
If I have an Observalbe :
List<Integer> ints = Lists.newArrayList(1, 2, 0, 3, 4);
Observable<Integer> o1 = Observable.from(ints);
I want to generate another observable , which divide by 12 :
Observable<Integer> o2 = o1.map(i -> 12/i);
o2.subscribe(
v -> logger.info ("Subscriber value {}", v) ,
t -> logger.error("Subscriber onError {} : {}", t.getClass() , t.getMessage())
);
It's obvious it will got error , and stopped when it encounter '0' :
RxTest - Subscriber value 12
RxTest - Subscriber value 6
RxTest - Subscriber onError class java.lang.ArithmeticException : / by zero
But what if I want the Observer(o2) skip the exception ?
I look into RxJava's doc about error handling , there is no way to skip the error. The onErrorResumeNext()
and onExceptionResumeNext()
needs a backup/fallback Observable
, which is not what I want. The onErrorReturn
need to specify the return value .
All three of the error handling methods cannot resume the original observer . for example :
Observable<Integer> o2 = o1.map(i -> 12/i)
.onErrorReturn(t -> 0);
It prints :
RxTest - Subscriber value 12
RxTest - Subscriber value 6
RxTest - Subscriber value 0
Not printing the rest 12/3 and 12/4
The only solution seems relay in the map
function :
Observable<Integer> o2 = o1.map(i -> {
try {
return Optional.of(12/i);
} catch (ArithmeticException e) {
return Optional.empty();
}
}).filter(Optional::isPresent)
.map(o -> (Integer) o.get());
It works , but it is cumbersome . I wonder if there's any way to easily skip any RuntimeException
when manipulating Observable
(such as map
)
The above is about skipping exception in Observable
. The following is about skipping exception in the Subscriber
:
The situation is the same :
List<Integer> ints = Lists.newArrayList(1, 2, 0 , 3 , 4);
Observable<Integer> o1 = Observable.from(ints);
o1.subscribe(
i -> logger.info("12 / {} = {}", i, 12 / i),
t -> logger.error("{} : {}", t.getClass() , t.getMessage()),
() -> logger.info("onCompleted")
);
It prints out :
RxTest - 12 / 1 = 12
RxTest - 12 / 2 = 6
RxTest - class java.lang.ArithmeticException : / by zero
When exception occurs in onNext
, it triggers onError
, and NOT RESPONDING to any data from the Observable
. If I want the subscriber to swallow the exception , I have to try-catch the ArithmeticException
in the onNext()
. Is there any cleaner solution ?
It seems when a Subscriber
faces an error in the onNext()
that cannot be handled within (onNext
) , it shall stop , right ? Is it a good design ?
Upvotes: 10
Views: 5221
Reputation: 2247
Observable.just(1, 2, 3, 0, 4)
.flatMap(i -> Observable.defer(() -> Observable.just(12 / i))
.onErrorResumeNext(Observable.just(0)));
it's one way to go, though keep in mind that RxJava assumes that error is something truly unexcpeted (you can expect values to be 0). On the other hand, if you want to ignore divide by 0 exception maybe you should filter/map your values before division.
Upvotes: 5
Reputation: 4027
import rx.lang.scala._
import scala.concurrent.duration.DurationDouble
import scala.util.{Failure, Success, Try}
val o1 = List(1, 2, 3, 0, 4) toObservable
val o2 = List(1, 2, 3, 4, 5) toObservable
val o3 = o1 zip o2 map {case (i1, i2) => i2 / i1 } // this will trigger a division by 0 error
val o4 = o3 lift {
subscriber: Subscriber[Try[Int]] => new Subscriber[Int](subscriber) {
override def onNext(v: Int) {
if (!subscriber.isUnsubscribed)
subscriber.onNext(Success(v))
}
override def onError(e: Throwable) {
if (!subscriber.isUnsubscribed)
subscriber.onNext(Failure(e))
}
override def onCompleted() {
if (!subscriber.isUnsubscribed)
subscriber.onCompleted()
}
}
}
o4 subscribe(println(_))
Upvotes: 0
Reputation: 2193
Try this way.
Observable<Integer> o2 = o1.map(i -> 12/i).onErrorFlatMap(e -> Observable.empty());
In the above code, error events are replaced by an empty stream and the original stream is resuming. Consequently errors are skipped.
Upvotes: 1