Reputation: 333
There is a sample program below that replicates my issue.
Issue:
flatMap
transformation to some Observable
Observable
, store the subscription somewhereObservable
terminates naturallyObservable
returned by the mapper function, raise an Exception
Exception
, raises it, program crashes/quitsPreferred/expected behaviour:
onError
handler instead of crashing the program when RxJavaPlugins#onError
is invokedThe culprit is the snippet of code below, found in ObservableFlatMap
. The issue is that once the parent has been disposed of, invocations to addThrowable
return false. Thus, the error is never propagated down to onError
.
@Override
public void onError(Throwable t) {
if (parent.errors.addThrowable(t)) {
if (!parent.delayErrors) {
parent.disposeAll();
}
done = true;
parent.drain();
} else {
RxJavaPlugins.onError(t);
}
}
What can I do in this situation? I need an operator that acts like flatMap
and propagates errors down to my onError
handler instead of crashing my program.
This is a real scenario with an Android app of mine. Subscriptions are automatically disposed when the user exits a window/activity and exceptions may be raised after disposal due to InterruptedIOException
s.
Code to replicate issue
import io.reactivex.Observable;
import io.reactivex.disposables.Disposable;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.schedulers.Schedulers;
public class Main {
public static void main(String[] args) throws InterruptedException {
RxJavaPlugins.setErrorHandler((throwable)->{
System.out.println("Please don't come through here");
throwable.printStackTrace();
});
Disposable disposable = Observable.just(1)
.subscribeOn(Schedulers.computation())
.flatMap((item)->{
return Observable.just(1)
.doOnNext((arg)->Thread.sleep(1000))
.doOnNext((arg)->{
throw new RuntimeException("Error");
});
})
.subscribe(System.out::println, (throwable)->{
System.out.println("Please come through here");
throwable.printStackTrace();
});
Thread.sleep(500);
disposable.dispose();
Thread.sleep(1000);
}
}
Execution output
Please don't come through here
io.reactivex.exceptions.UndeliverableException: java.lang.InterruptedException: sleep interrupted
at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:349)
at io.reactivex.internal.operators.observable.ObservableFlatMap$InnerObserver.onError(ObservableFlatMap.java:573)
at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onError(ObservableDoOnEach.java:119)
at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onError(ObservableDoOnEach.java:119)
at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onNext(ObservableDoOnEach.java:99)
at io.reactivex.internal.operators.observable.ObservableScalarXMap$ScalarDisposable.run(ObservableScalarXMap.java:248)
at io.reactivex.internal.operators.observable.ObservableJust.subscribeActual(ObservableJust.java:35)
at io.reactivex.Observable.subscribe(Observable.java:10903)
at io.reactivex.internal.operators.observable.ObservableDoOnEach.subscribeActual(ObservableDoOnEach.java:42)
at io.reactivex.Observable.subscribe(Observable.java:10903)
at io.reactivex.internal.operators.observable.ObservableDoOnEach.subscribeActual(ObservableDoOnEach.java:42)
at io.reactivex.Observable.subscribe(Observable.java:10903)
at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.subscribeInner(ObservableFlatMap.java:162)
at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.onNext(ObservableFlatMap.java:139)
at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeOnObserver.onNext(ObservableSubscribeOn.java:58)
at io.reactivex.internal.operators.observable.ObservableScalarXMap$ScalarDisposable.run(ObservableScalarXMap.java:248)
at io.reactivex.internal.operators.observable.ObservableJust.subscribeActual(ObservableJust.java:35)
at io.reactivex.Observable.subscribe(Observable.java:10903)
at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeTask.run(ObservableSubscribeOn.java:96)
at io.reactivex.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:38)
at io.reactivex.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:26)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
at Main.lambda$null$1(Main.java:18)
at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onNext(ObservableDoOnEach.java:95)
... 22 more
Expected/preferred output
Please come through here
java.lang.RuntimeException: Error
at Main.lambda$null$2(Main.java:19)
at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onNext(ObservableDoOnEach.java:95)
at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onNext(ObservableDoOnEach.java:103)
at io.reactivex.internal.operators.observable.ObservableScalarXMap$ScalarDisposable.run(ObservableScalarXMap.java:248)
at io.reactivex.internal.operators.observable.ObservableJust.subscribeActual(ObservableJust.java:35)
at io.reactivex.Observable.subscribe(Observable.java:10903)
at io.reactivex.internal.operators.observable.ObservableDoOnEach.subscribeActual(ObservableDoOnEach.java:42)
at io.reactivex.Observable.subscribe(Observable.java:10903)
at io.reactivex.internal.operators.observable.ObservableDoOnEach.subscribeActual(ObservableDoOnEach.java:42)
at io.reactivex.Observable.subscribe(Observable.java:10903)
at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.subscribeInner(ObservableFlatMap.java:162)
at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.onNext(ObservableFlatMap.java:139)
at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeOnObserver.onNext(ObservableSubscribeOn.java:58)
at io.reactivex.internal.operators.observable.ObservableScalarXMap$ScalarDisposable.run(ObservableScalarXMap.java:248)
at io.reactivex.internal.operators.observable.ObservableJust.subscribeActual(ObservableJust.java:35)
at io.reactivex.Observable.subscribe(Observable.java:10903)
at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeTask.run(ObservableSubscribeOn.java:96)
at io.reactivex.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:38)
at io.reactivex.internal.schedulers.ScheduledDirectTask.call(ScheduledDirectTask.java:26)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Upvotes: 2
Views: 5336
Reputation: 29
The problem is with your .subscribeOn(Schedulers.computation())
before flatMap
. When you dispose
your interrupting thread that produces flat map observables which breaks the whole subscription. To fix this your should specify subscription after or inside flatMap
or observe it in a different thread.
Working example:
RxJavaPlugins.setErrorHandler((throwable) -> {
System.out.println("Please don't come through here");
throwable.printStackTrace();
});
Disposable disposable = Observable.just(1)
.flatMap((item) -> {
return Observable.just(1)
.doOnNext((arg) -> Thread.sleep(1000))
.doOnNext((arg) -> {
throw new IllegalStateException("Error");
})
.subscribeOn(Schedulers.computation());
})
.subscribe(System.out::println,
(throwable) -> {
System.out.println("Please come through here");
throwable.printStackTrace();
});
Thread.sleep(500);
disposable.dispose();
Thread.sleep(1000);
Upvotes: 1