user10776303
user10776303

Reputation: 241

Disposable object is null after subscription

In the below code, I created an example to learn functional programming using Rx. I am trying to handle a HandlerThread as observable. In onResume(), I subscribe for Single.just observable to start the HandlerThread.

For SingleObserver callbacks, despite receiving the values in onSuccess(), the Disposable object in onSubscribe() is always null.

I also posted the logcat. Please have a look at it, and please let me know how why the Disposable object d is null.

code:

onResume() {
    this.mMyHandlerThreadInitSingleObs = Single.just(this.getInitializedHandlerThread())
            .map(myHandlerThread->{
                Log.d(TAG_LOG, "BEFORE .start()");
                myHandlerThread.start();
                Log.d(TAG_LOG, "AFTER .start()");

                return this.mMyHandlerThread;
            });
    this.mMyHandlerThreadInitSingleObs
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(this.getSingleObserver());
}

private SingleObserver<HandlerThread> getSingleObserver() {
    String TAG_LOG = ActMain.TAG_LOG + "." + "getSingleObserver()";
    return new SingleObserver<HandlerThread>() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.v(TAG_LOG, "[onSubscribe] d: " + d);
        }

        @Override
        public void onSuccess(HandlerThread is) {
            Log.v(TAG_LOG, "[onSuccess] is: " + is);
        }

        @Override
        public void onError(Throwable e) {
            Log.e(TAG_LOG, "[onError]");
        }
    };
}

logcat:

2018-12-22 14:56:50.329 12611-12611 V/ActMain: onStart
2018-12-22 14:56:50.332 12611-12611 V/ActMain.MyHandlerThread: constructor called
2018-12-22 14:56:50.333 12611-12611 V/ActMain.getSingleObserver(): [onSubscribe] d: null//<--------------------
2018-12-22 14:56:50.349 12611-12611 D/ActMain.onResume(): BEFORE .start()
2018-12-22 14:56:50.349 12611-12611 D/ActMain.onResume(): AFTER .start()
2018-12-22 14:56:50.350 12611-12630 V/ActMain.MyHandlerThread.onLooperPrepared: ..
2018-12-22 14:56:50.350 12611-12630 D/ActMain.MyHandlerThread.onLooperPrepared: this.getLooper(): Looper (my HandlerThread, tid 416) {2f35ee2}
2018-12-22 14:56:50.363 12611-12630 I/ActMain.MyHandlerThread.onLooperPrepared: [onSubscribe] d: null
2018-12-22 14:56:50.377 12611-12633 D/ActMain.MyHandlerThread.onLooperPrepared.emitter.onComplete():: this.getLooper() initialized: Looper (my HandlerThread, tid 416) {2f35ee2}
2018-12-22 14:56:50.377 12611-12633 I/ActMain.MyHandlerThread.onLooperPrepared: [onComplete]
2018-12-22 14:56:50.425 12611-12611 V/ActMain.getSingleObserver(): [onSuccess] is: Thread[my HandlerThread,5,main]
2018-12-22 14:56:50.514 1700-1724 I/ActivityManager: Displayed com.example.amrbakri.rxhandlerthread_01/.ActMain: +340ms

Upvotes: 1

Views: 1396

Answers (1)

AnxGotta
AnxGotta

Reputation: 1006

It turns out that this is an interesting question. From the docs for RxJava2 (Single)

Single behaves similarly to Observable except that it can only emit either a single successful value or an error (there is no "onComplete" notification as there is for an Observable).

The Single class implements the SingleSource base interface and the default consumer type it interacts with is the SingleObserver via the subscribe(SingleObserver) method.

It looks like your using the SingleObserver rather than the DisposableSingleObserver.

The docs mention that:

Note that by design, subscriptions via subscribe(SingleObserver) can't be disposed from the outside (hence the void return of the subscribe(SingleObserver) method) and it is the responsibility of the implementor of the SingleObserver to allow this to happen. RxJava supports such usage with the standard DisposableSingleObserver instance. For convenience, the subscribeWith(SingleObserver) method is provided as well to allow working with a SingleObserver (or subclass) instance to be applied with in a fluent manner (such as in the example above).

So try doing this instead:

Disposable d = Single.just("Hello World")
    .delay(10, TimeUnit.SECONDS, Schedulers.io())
    .subscribeWith(new DisposableSingleObserver<String>() {
        @Override
        public void onStart() {
            System.out.println("Started");
        }

        @Override
        public void onSuccess(String value) {
            System.out.println("Success: " + value);
        }

        @Override
        public void onError(Throwable error) {
            error.printStackTrace();
        }
    });

Upvotes: 1

Related Questions