Reputation: 976
I have the following code
Observable.just(10)
.doOnTerminate(() -> Log.d("LOG", "ON TERMINATE"))
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(result -> {
int a = result / 0; // force an exception
}, error -> {
Log.d("LOG", "ERROR");
error.printStackTrace();
});
Which will give output:
LOG: ON TERMINATE
LOG: ERROR
But If I modified it a bit into
Observable.just(10)
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.doOnTerminate(() -> Log.d("LOG", "ON TERMINATE"))
.subscribe(result -> {
int a = result / 0; // force an exception
}, error -> {
Log.d("LOG", "ERROR");
error.printStackTrace();
});
It will only gives LOG: ERROR
Why is the doOnTerminate
function did not get called on the second code?
Upvotes: 4
Views: 1504
Reputation: 2390
I think it is because of where the doOnTerminate should be called. In first case it is called on the new thread. So events are as follows:
+-newThread-------------------------------------------------------------------------+
| emission ----- 10 -- |(completion because only one item was emitted) |
| doOnTerminate call(reason for calling is that emissions completed) |
+-mainThread------------------------------------------------------------------------+
| subscriber ----- 10(throw div/0)---X |
+-----------------------------------------------------------------------------------+
If you look into how just
operator is implemented under the hood you will see that it calls onCompleted
right after onNext
.
And when you move the doOnTerminate to the main
+-newThread-------------------------------------------------------------------------+
| emission ----- 10 -- |(completion because only one item was emitted) |
+-mainThread------------------------------------------------------------------------+
| doOnTerminate (not called) |
| subscriber ----- 10(throw div/0)->X |
+-----------------------------------------------------------------------------------+
Reason for not calling the doOnTerminate
is that the error happened in the onNext portion of the subscriber. And in this case it is the LambdaObserver which when detects the error in onNext
calls onError
directly(see the link).
My test code:
final long[] start = {0};
Observable.just(10)
.subscribeOn(Schedulers.newThread())
.doOnNext(integer -> {
start[0] = System.currentTimeMillis();
message("onNext", start[0]);
})
.doOnTerminate(() -> message("doOnTerminate", start[0]))
.doFinally(() -> message("doFinally", start[0]))
.doAfterTerminate(() -> message("doAfterTerminate", start[0]))
.observeOn(AndroidSchedulers.mainThread())
.doOnTerminate(() -> message("doOnTerminate", start[0]))
.doFinally(() -> message("doFinally", start[0]))
.doAfterTerminate(() -> message("doAfterTerminate", start[0]))
.subscribe(integer -> {
message("Next", start[0]);
int a = integer / 0;
},
throwable -> message("ERROR", start[0])
);
void message(String message, long start) {
Log.d("LOG", message + " " + Thread.currentThread().getName() + " " + (System.currentTimeMillis() - start));
}
And it's output:
D/LOG: onNext RxNewThreadScheduler-1 0
D/LOG: doOnTerminate RxNewThreadScheduler-1 0
D/LOG: doAfterTerminate RxNewThreadScheduler-1 0
D/LOG: doFinally RxNewThreadScheduler-1 0
D/LOG: Next main 16
D/LOG: doFinally main 16
D/LOG: ERROR main 16
Solution
To be safe from this case and in the same time from missing unsubscription(disposal) event I would use the doFinally operator.
All this is based on the 2.0.4 version of RxJava.
Upvotes: 4
Reputation: 16142
doOnTerminate
will NOT be called on unsubscription, only on onError
or onComplete
. As your error occurs after the operator, only unsubscribe will reach it.
Add .doOnUnsubscribe(() -> Log.d("LOG", "ON TERMINATE"))
after the doOnTerminate
.
Upvotes: 0