Reputation: 701
i am struggling with implementation of RxJava with Socket.io. I have a problem to subscribe again to same observable, so i made an example and bit clearer code to understand.
here is the function:
private void testObservable() throws InterruptedException {
io.reactivex.Observable observable = io.reactivex.Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(ObservableEmitter e) throws Exception {
for (int i = 0; i < 20; i++) {
java.lang.Thread.sleep(100);
e.onNext(i);
Timber.d("TESTOBSERVER emit %d", i);
}
e.onComplete();
}
});
DisposableObserver observer = new DisposableObserver<Integer>() {
@Override
public void onNext(Integer value) {
Timber.d("TESTOBSERVER observed: %d", value);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
observable.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(observer);
Timber.d("TESTOBSERVER subscribe 1");
java.lang.Thread.sleep(500);
observer.dispose();
Timber.d("TESTOBSERVER dispose 1");
java.lang.Thread.sleep(500);
observable.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(observer);
Timber.d("TESTOBSERVER subscribe 2");
java.lang.Thread.sleep(500);
observer.dispose();
Timber.d("TESTOBSERVER dispose 2");
}
and here is what it prints
06-07 15:02:49.851 12649-12649/com.skunkaga.Skunkaga D/ThreadListPresenter: TESTOBSERVER subscribe 1
06-07 15:02:49.951 12649-12840/com.skunkaga.Skunkaga D/ThreadListPresenter: TESTOBSERVER emit 0
06-07 15:02:50.051 12649-12840/com.skunkaga.Skunkaga D/ThreadListPresenter: TESTOBSERVER emit 1
06-07 15:02:50.161 12649-12840/com.skunkaga.Skunkaga D/ThreadListPresenter: TESTOBSERVER emit 2
06-07 15:02:50.261 12649-12840/com.skunkaga.Skunkaga D/ThreadListPresenter: TESTOBSERVER emit 3
06-07 15:02:50.351 12649-12649/com.skunkaga.Skunkaga D/ThreadListPresenter: TESTOBSERVER dispose 1
06-07 15:02:50.861 12649-12649/com.skunkaga.Skunkaga D/ThreadListPresenter: TESTOBSERVER subscribe 2
06-07 15:02:51.361 12649-12649/com.skunkaga.Skunkaga D/ThreadListPresenter: TESTOBSERVER dispose 2
I have two questions:
TESTOBSERVER observed: 1
?Upvotes: 1
Views: 879
Reputation: 25603
- Why it doesn't print out TESTOBSERVER observed: 1?
Presumably because testObservable()
is executing on the android main thread. So there is never a chance to observe values using AndroidSchedulers.mainThread()
before it is cancelled.
- Why after second subscribe it doesn't continue with emitting 9,10,11,12,13?
Because Observables are not stateful. Every .subscribe()
call is independant of all others.
The reason it does not emit anything is because Observer
cannot be reused. So since it was already subscribed and disposed it will not subscribe to the observable.
Upvotes: 2