Fadli
Fadli

Reputation: 976

observeOn() impacts doOnTerminate() invocation

I have the following code

Observable.just(10)
            .doOnTerminate(() -> Log.d("LOG", "ON TERMINATE"))
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(result -> {
                int a = result / 0; // force an exception
            }, error -> {
                Log.d("LOG", "ERROR");
                error.printStackTrace();
            });

Which will give output:

LOG: ON TERMINATE

LOG: ERROR

But If I modified it a bit into

Observable.just(10)
             .subscribeOn(Schedulers.newThread())
             .observeOn(AndroidSchedulers.mainThread())
             .doOnTerminate(() -> Log.d("LOG", "ON TERMINATE"))
             .subscribe(result -> {
                 int a = result / 0; // force an exception
             }, error -> {
                 Log.d("LOG", "ERROR");
                 error.printStackTrace();
             });

It will only gives LOG: ERROR

Why is the doOnTerminate function did not get called on the second code?

Upvotes: 4

Views: 1504

Answers (2)

MatBos
MatBos

Reputation: 2390

I think it is because of where the doOnTerminate should be called. In first case it is called on the new thread. So events are as follows:

+-newThread-------------------------------------------------------------------------+
| emission      ----- 10 -- |(completion because only one item was emitted)         |
| doOnTerminate            call(reason for calling is that emissions completed)     |
+-mainThread------------------------------------------------------------------------+
| subscriber    ----- 10(throw div/0)---X                                           |
+-----------------------------------------------------------------------------------+

If you look into how just operator is implemented under the hood you will see that it calls onCompleted right after onNext.

And when you move the doOnTerminate to the main

+-newThread-------------------------------------------------------------------------+
| emission      ----- 10 -- |(completion because only one item was emitted)         |
+-mainThread------------------------------------------------------------------------+
| doOnTerminate                                                        (not called) |
| subscriber    ----- 10(throw div/0)->X                                            |
+-----------------------------------------------------------------------------------+

Reason for not calling the doOnTerminate is that the error happened in the onNext portion of the subscriber. And in this case it is the LambdaObserver which when detects the error in onNext calls onError directly(see the link).


My test code:

    final long[] start = {0};
    Observable.just(10)
            .subscribeOn(Schedulers.newThread())
            .doOnNext(integer -> {
                start[0] = System.currentTimeMillis();
                message("onNext", start[0]);
            })
            .doOnTerminate(() -> message("doOnTerminate", start[0]))
            .doFinally(() -> message("doFinally", start[0]))
            .doAfterTerminate(() -> message("doAfterTerminate", start[0]))
            .observeOn(AndroidSchedulers.mainThread())
            .doOnTerminate(() -> message("doOnTerminate", start[0]))
            .doFinally(() -> message("doFinally", start[0]))
            .doAfterTerminate(() -> message("doAfterTerminate", start[0]))
            .subscribe(integer -> {
                        message("Next", start[0]);
                        int a = integer / 0;
                    },
                    throwable -> message("ERROR", start[0])
            );

     void message(String message, long start) {
            Log.d("LOG", message + " " + Thread.currentThread().getName() + " " + (System.currentTimeMillis() - start));
     }

And it's output:

D/LOG: onNext RxNewThreadScheduler-1 0
D/LOG: doOnTerminate RxNewThreadScheduler-1 0
D/LOG: doAfterTerminate RxNewThreadScheduler-1 0
D/LOG: doFinally RxNewThreadScheduler-1 0
D/LOG: Next main 16
D/LOG: doFinally main 16
D/LOG: ERROR main 16

Solution

To be safe from this case and in the same time from missing unsubscription(disposal) event I would use the doFinally operator.

All this is based on the 2.0.4 version of RxJava.

Upvotes: 4

Tassos Bassoukos
Tassos Bassoukos

Reputation: 16142

doOnTerminate will NOT be called on unsubscription, only on onError or onComplete. As your error occurs after the operator, only unsubscribe will reach it.

Add .doOnUnsubscribe(() -> Log.d("LOG", "ON TERMINATE")) after the doOnTerminate.

Upvotes: 0

Related Questions