Reputation: 241
In the below code, I created an example to learn functional programming using Rx. I am trying to handle a HandlerThread
as observable. In onResume()
, I subscribe for Single.just
observable to start the
HandlerThread
.
For SingleObserver
callbacks, despite receiving the values in onSuccess()
, the Disposable
object in onSubscribe()
is always null
.
I also posted the logcat. Please have a look at it, and please let me know how why the Disposable
object d
is null
.
code:
onResume() {
this.mMyHandlerThreadInitSingleObs = Single.just(this.getInitializedHandlerThread())
.map(myHandlerThread->{
Log.d(TAG_LOG, "BEFORE .start()");
myHandlerThread.start();
Log.d(TAG_LOG, "AFTER .start()");
return this.mMyHandlerThread;
});
this.mMyHandlerThreadInitSingleObs
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(this.getSingleObserver());
}
private SingleObserver<HandlerThread> getSingleObserver() {
String TAG_LOG = ActMain.TAG_LOG + "." + "getSingleObserver()";
return new SingleObserver<HandlerThread>() {
@Override
public void onSubscribe(Disposable d) {
Log.v(TAG_LOG, "[onSubscribe] d: " + d);
}
@Override
public void onSuccess(HandlerThread is) {
Log.v(TAG_LOG, "[onSuccess] is: " + is);
}
@Override
public void onError(Throwable e) {
Log.e(TAG_LOG, "[onError]");
}
};
}
logcat:
2018-12-22 14:56:50.329 12611-12611 V/ActMain: onStart
2018-12-22 14:56:50.332 12611-12611 V/ActMain.MyHandlerThread: constructor called
2018-12-22 14:56:50.333 12611-12611 V/ActMain.getSingleObserver(): [onSubscribe] d: null//<--------------------
2018-12-22 14:56:50.349 12611-12611 D/ActMain.onResume(): BEFORE .start()
2018-12-22 14:56:50.349 12611-12611 D/ActMain.onResume(): AFTER .start()
2018-12-22 14:56:50.350 12611-12630 V/ActMain.MyHandlerThread.onLooperPrepared: ..
2018-12-22 14:56:50.350 12611-12630 D/ActMain.MyHandlerThread.onLooperPrepared: this.getLooper(): Looper (my HandlerThread, tid 416) {2f35ee2}
2018-12-22 14:56:50.363 12611-12630 I/ActMain.MyHandlerThread.onLooperPrepared: [onSubscribe] d: null
2018-12-22 14:56:50.377 12611-12633 D/ActMain.MyHandlerThread.onLooperPrepared.emitter.onComplete():: this.getLooper() initialized: Looper (my HandlerThread, tid 416) {2f35ee2}
2018-12-22 14:56:50.377 12611-12633 I/ActMain.MyHandlerThread.onLooperPrepared: [onComplete]
2018-12-22 14:56:50.425 12611-12611 V/ActMain.getSingleObserver(): [onSuccess] is: Thread[my HandlerThread,5,main]
2018-12-22 14:56:50.514 1700-1724 I/ActivityManager: Displayed com.example.amrbakri.rxhandlerthread_01/.ActMain: +340ms
Upvotes: 1
Views: 1396
Reputation: 1006
It turns out that this is an interesting question. From the docs for RxJava2 (Single)
Single behaves similarly to Observable except that it can only emit either a single successful value or an error (there is no "onComplete" notification as there is for an Observable).
The Single class implements the SingleSource base interface and the default consumer type it interacts with is the SingleObserver via the subscribe(SingleObserver) method.
It looks like your using the SingleObserver
rather than the DisposableSingleObserver
.
The docs mention that:
Note that by design, subscriptions via subscribe(SingleObserver) can't be disposed from the outside (hence the void return of the subscribe(SingleObserver) method) and it is the responsibility of the implementor of the SingleObserver to allow this to happen. RxJava supports such usage with the standard DisposableSingleObserver instance. For convenience, the subscribeWith(SingleObserver) method is provided as well to allow working with a SingleObserver (or subclass) instance to be applied with in a fluent manner (such as in the example above).
So try doing this instead:
Disposable d = Single.just("Hello World")
.delay(10, TimeUnit.SECONDS, Schedulers.io())
.subscribeWith(new DisposableSingleObserver<String>() {
@Override
public void onStart() {
System.out.println("Started");
}
@Override
public void onSuccess(String value) {
System.out.println("Success: " + value);
}
@Override
public void onError(Throwable error) {
error.printStackTrace();
}
});
Upvotes: 1