Simon Harvan
Simon Harvan

Reputation: 701

RxJava continue emitting after subscribing again

i am struggling with implementation of RxJava with Socket.io. I have a problem to subscribe again to same observable, so i made an example and bit clearer code to understand.

here is the function:

private void testObservable() throws InterruptedException {
    io.reactivex.Observable observable = io.reactivex.Observable.create(new ObservableOnSubscribe() {
        @Override
        public void subscribe(ObservableEmitter e) throws Exception {
            for (int i = 0; i < 20; i++) {
                java.lang.Thread.sleep(100);
                e.onNext(i);
                Timber.d("TESTOBSERVER emit %d", i);
            }
            e.onComplete();
        }
    });
    DisposableObserver observer = new DisposableObserver<Integer>() {
        @Override
        public void onNext(Integer value) {
            Timber.d("TESTOBSERVER observed: %d", value);
        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onComplete() {

        }
    };
    observable.subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeWith(observer);
    Timber.d("TESTOBSERVER subscribe 1");

    java.lang.Thread.sleep(500);
    observer.dispose();

    Timber.d("TESTOBSERVER dispose 1");

    java.lang.Thread.sleep(500);
    observable.subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeWith(observer);
    Timber.d("TESTOBSERVER subscribe 2");
    java.lang.Thread.sleep(500);
    observer.dispose();
    Timber.d("TESTOBSERVER dispose 2");
}

and here is what it prints

06-07 15:02:49.851 12649-12649/com.skunkaga.Skunkaga D/ThreadListPresenter: TESTOBSERVER subscribe 1
06-07 15:02:49.951 12649-12840/com.skunkaga.Skunkaga D/ThreadListPresenter: TESTOBSERVER emit 0
06-07 15:02:50.051 12649-12840/com.skunkaga.Skunkaga D/ThreadListPresenter: TESTOBSERVER emit 1
06-07 15:02:50.161 12649-12840/com.skunkaga.Skunkaga D/ThreadListPresenter: TESTOBSERVER emit 2
06-07 15:02:50.261 12649-12840/com.skunkaga.Skunkaga D/ThreadListPresenter: TESTOBSERVER emit 3
06-07 15:02:50.351 12649-12649/com.skunkaga.Skunkaga D/ThreadListPresenter: TESTOBSERVER dispose 1
06-07 15:02:50.861 12649-12649/com.skunkaga.Skunkaga D/ThreadListPresenter: TESTOBSERVER subscribe 2
06-07 15:02:51.361 12649-12649/com.skunkaga.Skunkaga D/ThreadListPresenter: TESTOBSERVER dispose 2

I have two questions:

  1. Why it doesn't print out TESTOBSERVER observed: 1?
  2. Why after second subscribe it doesn't continue with emitting 9,10,11,12,13?

Upvotes: 1

Views: 879

Answers (1)

Kiskae
Kiskae

Reputation: 25603

  1. Why it doesn't print out TESTOBSERVER observed: 1?

Presumably because testObservable() is executing on the android main thread. So there is never a chance to observe values using AndroidSchedulers.mainThread() before it is cancelled.

  1. Why after second subscribe it doesn't continue with emitting 9,10,11,12,13?

Because Observables are not stateful. Every .subscribe() call is independant of all others. The reason it does not emit anything is because Observer cannot be reused. So since it was already subscribed and disposed it will not subscribe to the observable.

Upvotes: 2

Related Questions