smallufo
smallufo

Reputation: 11796

RxJava Observable and Subscriber for skipping exception?

If I have an Observalbe :

List<Integer> ints = Lists.newArrayList(1, 2, 0, 3, 4);
Observable<Integer> o1 = Observable.from(ints);

I want to generate another observable , which divide by 12 :

Observable<Integer> o2 = o1.map(i -> 12/i);
o2.subscribe(
  v -> logger.info ("Subscriber value {}", v) ,
  t -> logger.error("Subscriber onError {} : {}", t.getClass() , t.getMessage())
);

It's obvious it will got error , and stopped when it encounter '0' :

RxTest - Subscriber value 12
RxTest - Subscriber value 6
RxTest - Subscriber onError class java.lang.ArithmeticException : / by zero

But what if I want the Observer(o2) skip the exception ?

I look into RxJava's doc about error handling , there is no way to skip the error. The onErrorResumeNext() and onExceptionResumeNext() needs a backup/fallback Observable , which is not what I want. The onErrorReturn need to specify the return value .

All three of the error handling methods cannot resume the original observer . for example :

Observable<Integer> o2 = o1.map(i -> 12/i)
      .onErrorReturn(t -> 0);

It prints :

RxTest - Subscriber value 12
RxTest - Subscriber value 6
RxTest - Subscriber value 0

Not printing the rest 12/3 and 12/4

The only solution seems relay in the map function :

Observable<Integer> o2 = o1.map(i -> {
  try {
    return Optional.of(12/i);
  } catch (ArithmeticException e) {
    return Optional.empty();
  }
}).filter(Optional::isPresent)
  .map(o -> (Integer) o.get());

It works , but it is cumbersome . I wonder if there's any way to easily skip any RuntimeException when manipulating Observable (such as map)

The above is about skipping exception in Observable . The following is about skipping exception in the Subscriber :

The situation is the same :

List<Integer> ints = Lists.newArrayList(1, 2, 0 , 3 , 4);
Observable<Integer> o1 = Observable.from(ints);
o1.subscribe(
  i -> logger.info("12 / {} = {}", i, 12 / i),
  t -> logger.error("{} : {}", t.getClass() , t.getMessage()),
  () -> logger.info("onCompleted")
);

It prints out :

RxTest - 12 / 1 = 12
RxTest - 12 / 2 = 6
RxTest - class java.lang.ArithmeticException : / by zero

When exception occurs in onNext , it triggers onError , and NOT RESPONDING to any data from the Observable . If I want the subscriber to swallow the exception , I have to try-catch the ArithmeticException in the onNext() . Is there any cleaner solution ?

It seems when a Subscriber faces an error in the onNext() that cannot be handled within (onNext) , it shall stop , right ? Is it a good design ?

Upvotes: 10

Views: 5221

Answers (3)

krp
krp

Reputation: 2247

Observable.just(1, 2, 3, 0, 4)
            .flatMap(i -> Observable.defer(() -> Observable.just(12 / i))
                    .onErrorResumeNext(Observable.just(0)));

it's one way to go, though keep in mind that RxJava assumes that error is something truly unexcpeted (you can expect values to be 0). On the other hand, if you want to ignore divide by 0 exception maybe you should filter/map your values before division.

Upvotes: 5

boggy
boggy

Reputation: 4027

import rx.lang.scala._

import scala.concurrent.duration.DurationDouble
import scala.util.{Failure, Success, Try}

val o1 = List(1, 2, 3, 0, 4) toObservable
val o2 = List(1, 2, 3, 4, 5) toObservable

val o3 = o1 zip o2 map {case (i1, i2) => i2 / i1 } // this will trigger a division by 0 error

val o4 = o3 lift {  
      subscriber: Subscriber[Try[Int]] => new Subscriber[Int](subscriber) {
        override def onNext(v: Int) {
          if (!subscriber.isUnsubscribed)
            subscriber.onNext(Success(v))
        }

        override def onError(e: Throwable) {
          if (!subscriber.isUnsubscribed)
            subscriber.onNext(Failure(e))
        }

        override def onCompleted() {
          if (!subscriber.isUnsubscribed)
            subscriber.onCompleted()
        }
      }
    }

o4 subscribe(println(_))

Upvotes: 0

findall
findall

Reputation: 2193

Try this way.

Observable<Integer> o2 = o1.map(i -> 12/i).onErrorFlatMap(e -> Observable.empty());

In the above code, error events are replaced by an empty stream and the original stream is resuming. Consequently errors are skipped.

Upvotes: 1

Related Questions