Franklin Wang
Franklin Wang

Reputation: 333

Error handling when using Observable.flatMap

There is a sample program below that replicates my issue.

Issue:

  1. Apply the flatMap transformation to some Observable
  2. Subscribe to the aforementioned Observable, store the subscription somewhere
  3. Dispose of the aforementioned subscription before the Observable terminates naturally
  4. In an Observable returned by the mapper function, raise an Exception
  5. Flatmap operator doesn't know how to handle the Exception, raises it, program crashes/quits

Preferred/expected behaviour:

The culprit is the snippet of code below, found in ObservableFlatMap. The issue is that once the parent has been disposed of, invocations to addThrowable return false. Thus, the error is never propagated down to onError.

@Override
public void onError(Throwable t) {
  if (parent.errors.addThrowable(t)) {
      if (!parent.delayErrors) {
          parent.disposeAll();
      }
      done = true;
      parent.drain();
  } else {
      RxJavaPlugins.onError(t);
  }
}

What can I do in this situation? I need an operator that acts like flatMap and propagates errors down to my onError handler instead of crashing my program.

This is a real scenario with an Android app of mine. Subscriptions are automatically disposed when the user exits a window/activity and exceptions may be raised after disposal due to InterruptedIOExceptions.

Code to replicate issue

import io.reactivex.Observable;
import io.reactivex.disposables.Disposable;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.schedulers.Schedulers;

public class Main {

  public static void main(String[] args) throws InterruptedException {
    RxJavaPlugins.setErrorHandler((throwable)->{
      System.out.println("Please don't come through here");
      throwable.printStackTrace();
    });
    Disposable disposable = Observable.just(1)
        .subscribeOn(Schedulers.computation())
        .flatMap((item)->{
          return Observable.just(1)
              .doOnNext((arg)->Thread.sleep(1000))
              .doOnNext((arg)->{
                throw new RuntimeException("Error");
              });
        })
        .subscribe(System.out::println, (throwable)->{
          System.out.println("Please come through here");
          throwable.printStackTrace();
        });
    Thread.sleep(500);
    disposable.dispose();
    Thread.sleep(1000);
  }

}

Execution output

Please don't come through here
io.reactivex.exceptions.UndeliverableException: java.lang.InterruptedException: sleep interrupted
  at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:349)
  at io.reactivex.internal.operators.observable.ObservableFlatMap$InnerObserver.onError(ObservableFlatMap.java:573)
  at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onError(ObservableDoOnEach.java:119)
  at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onError(ObservableDoOnEach.java:119)
  at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onNext(ObservableDoOnEach.java:99)
  at io.reactivex.internal.operators.observable.ObservableScalarXMap$ScalarDisposable.run(ObservableScalarXMap.java:248)
  at io.reactivex.internal.operators.observable.ObservableJust.subscribeActual(ObservableJust.java:35)
  at io.reactivex.Observable.subscribe(Observable.java:10903)
  at io.reactivex.internal.operators.observable.ObservableDoOnEach.subscribeActual(ObservableDoOnEach.java:42)
  at io.reactivex.Observable.subscribe(Observable.java:10903)
  at io.reactivex.internal.operators.observable.ObservableDoOnEach.subscribeActual(ObservableDoOnEach.java:42)
  at io.reactivex.Observable.subscribe(Observable.java:10903)
  at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.subscribeInner(ObservableFlatMap.java:162)
  at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.onNext(ObservableFlatMap.java:139)
  at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeOnObserver.onNext(ObservableSubscribeOn.java:58)
  at io.reactivex.internal.operators.observable.ObservableScalarXMap$ScalarDisposable.run(ObservableScalarXMap.java:248)
  at io.reactivex.internal.operators.observable.ObservableJust.subscribeActual(ObservableJust.java:35)
  at io.reactivex.Observable.subscribe(Observable.java:10903)
  at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeTask.run(ObservableSubscribeOn.java:96)
  at io.reactivex.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:38)
  at io.reactivex.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:26)
  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
  at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
  at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.InterruptedException: sleep interrupted
  at java.lang.Thread.sleep(Native Method)
  at Main.lambda$null$1(Main.java:18)
  at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onNext(ObservableDoOnEach.java:95)
  ... 22 more

Expected/preferred output

Please come through here
java.lang.RuntimeException: Error
  at Main.lambda$null$2(Main.java:19)
  at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onNext(ObservableDoOnEach.java:95)
  at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onNext(ObservableDoOnEach.java:103)
  at io.reactivex.internal.operators.observable.ObservableScalarXMap$ScalarDisposable.run(ObservableScalarXMap.java:248)
  at io.reactivex.internal.operators.observable.ObservableJust.subscribeActual(ObservableJust.java:35)
  at io.reactivex.Observable.subscribe(Observable.java:10903)
  at io.reactivex.internal.operators.observable.ObservableDoOnEach.subscribeActual(ObservableDoOnEach.java:42)
  at io.reactivex.Observable.subscribe(Observable.java:10903)
  at io.reactivex.internal.operators.observable.ObservableDoOnEach.subscribeActual(ObservableDoOnEach.java:42)
  at io.reactivex.Observable.subscribe(Observable.java:10903)
  at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.subscribeInner(ObservableFlatMap.java:162)
  at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.onNext(ObservableFlatMap.java:139)
  at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeOnObserver.onNext(ObservableSubscribeOn.java:58)
  at io.reactivex.internal.operators.observable.ObservableScalarXMap$ScalarDisposable.run(ObservableScalarXMap.java:248)
  at io.reactivex.internal.operators.observable.ObservableJust.subscribeActual(ObservableJust.java:35)
  at io.reactivex.Observable.subscribe(Observable.java:10903)
  at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeTask.run(ObservableSubscribeOn.java:96)
  at io.reactivex.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:38)
  at io.reactivex.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:26)
  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
  at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
  at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:748)

Upvotes: 2

Views: 5336

Answers (1)

DDH
DDH

Reputation: 29

The problem is with your .subscribeOn(Schedulers.computation()) before flatMap. When you dispose your interrupting thread that produces flat map observables which breaks the whole subscription. To fix this your should specify subscription after or inside flatMap or observe it in a different thread.

Working example:

RxJavaPlugins.setErrorHandler((throwable) -> {
        System.out.println("Please don't come through here");
        throwable.printStackTrace();
    });
    Disposable disposable = Observable.just(1)
            .flatMap((item) -> {
                return Observable.just(1)

                        .doOnNext((arg) -> Thread.sleep(1000))
                        .doOnNext((arg) -> {
                            throw new IllegalStateException("Error");
                        })
                        .subscribeOn(Schedulers.computation());
            })
            .subscribe(System.out::println,
                    (throwable) -> {
                        System.out.println("Please come through here");
                        throwable.printStackTrace();
                    });
    Thread.sleep(500);
    disposable.dispose();
    Thread.sleep(1000);

Upvotes: 1

Related Questions