Reputation: 375
The code (Kotlin) is very simple, but I don't understand, why I can't use the same object to subscribe again?
val x = Observable.interval(1L, TimeUnit.SECONDS, Schedulers.io())
.map {
println("emitting=$it")
it.toString()
}.publish().autoConnect()
val o = object : DisposableObserver<String>() {
override fun onComplete() {}
override fun onNext(t: String) = println("O:=$t")
override fun onError(e: Throwable) {}
}
println("---------- subscribe ----------")
val s2 = x.subscribeWith(o)
sleepSeconds(2)
println("---------- dispose ----------")
s2.dispose()
sleepSeconds(2)
println("---------- subscribe again ----------")
x.subscribeWith(o) //<<-- This doesn't work!!!!
sleepSeconds(5)
The console output is:
---------- subscribe ----------
emitting=0
O:=0
emitting=1
O:=1
---------- dispose ----------
emitting=2
emitting=3
---------- subscribe again ----------
emitting=4
emitting=5
etc.....
It works well, when I create a new instance of class DisposableObserver.
Upvotes: 0
Views: 350
Reputation: 186
DisposableObserver
is designed to only able subscribed once javadoc. When you look under hood of DisposableObserver, you will see AtomicReference upstream
which saving current disposable when observer subscribing.
public abstract class DisposableObserver<T> implements Observer<T>, Disposable {
final AtomicReference<Disposable> upstream = new AtomicReference<Disposable>();
@Override
public final void onSubscribe(@NonNull Disposable d) {
if (EndConsumerHelper.setOnce(this.upstream, d, getClass())) {
onStart();
}
}
// rest of code
EndConsumerHelper.setOnce
make sure DisposableObserver subscribed only once. If upstream
has been disposed, then no other upstream
could be set.
public static boolean setOnce(AtomicReference<Disposable> upstream, Disposable next, Class<?> observer) {
ObjectHelper.requireNonNull(next, "next is null");
if (!upstream.compareAndSet(null, next)) {
next.dispose(); // dispose next if there is set upstream previously
if (upstream.get() != DisposableHelper.DISPOSED) {
reportDoubleSubscription(observer);
}
return false;
}
return true;
}
That's why you cannot resubscribe with same DisposableObserver instance, but will work using new instance.
Upvotes: 1