Maksym
Maksym

Reputation: 375

Can't use the same DisposableObserver with subscribeWith twice after dispose()

The code (Kotlin) is very simple, but I don't understand, why I can't use the same object to subscribe again?

val x = Observable.interval(1L, TimeUnit.SECONDS, Schedulers.io())
    .map {
        println("emitting=$it")
        it.toString()
    }.publish().autoConnect()

val o = object : DisposableObserver<String>() {
    override fun onComplete() {}
    override fun onNext(t: String) = println("O:=$t")
    override fun onError(e: Throwable) {}
}
println("---------- subscribe ----------")
val s2 = x.subscribeWith(o)

sleepSeconds(2)
println("---------- dispose ----------")
s2.dispose()

sleepSeconds(2)
println("---------- subscribe again ----------")
x.subscribeWith(o) //<<-- This doesn't work!!!!

sleepSeconds(5)

The console output is:

---------- subscribe ----------
emitting=0
O:=0
emitting=1
O:=1
---------- dispose ----------
emitting=2
emitting=3
---------- subscribe again ----------
emitting=4
emitting=5
etc.....

It works well, when I create a new instance of class DisposableObserver.

Upvotes: 0

Views: 350

Answers (1)

chandra
chandra

Reputation: 186

DisposableObserver is designed to only able subscribed once javadoc. When you look under hood of DisposableObserver, you will see AtomicReference upstream which saving current disposable when observer subscribing.

public abstract class DisposableObserver<T> implements Observer<T>, Disposable {

    final AtomicReference<Disposable> upstream = new AtomicReference<Disposable>();

    @Override
    public final void onSubscribe(@NonNull Disposable d) {
        if (EndConsumerHelper.setOnce(this.upstream, d, getClass())) {
            onStart();
        }
    }
// rest of code

EndConsumerHelper.setOnce make sure DisposableObserver subscribed only once. If upstream has been disposed, then no other upstream could be set.

public static boolean setOnce(AtomicReference<Disposable> upstream, Disposable next, Class<?> observer) {
        ObjectHelper.requireNonNull(next, "next is null");
        if (!upstream.compareAndSet(null, next)) { 
            next.dispose();  // dispose next if there is set upstream previously 
            if (upstream.get() != DisposableHelper.DISPOSED) {
                reportDoubleSubscription(observer);
            }
            return false;
        }
        return true;  
    }

That's why you cannot resubscribe with same DisposableObserver instance, but will work using new instance.

Upvotes: 1

Related Questions