Reputation: 451
Hi, guys. I'm trying to make some kind of MVC with RxJava. So the idea is to make some constant subscriptions that will be always subscribed to some observable. Also this observable can be restarted at any time to rerun, for example a network call. I tried this code to test this feature:
class Main {
companion object {
@JvmStatic
fun main(args: Array<String>) {
val obs = Observable.interval(1, TimeUnit.SECONDS, Schedulers.io())
.publish()
val s1 = obs
.doOnUnsubscribe { System.out.println("s1 unsubscribed") }
.subscribe { System.out.println("first: $it") }
val s = obs.connect()
Thread.sleep(4000)
System.out.println("unsubscribe")
s.unsubscribe()
Thread.sleep(1000)
System.out.println("connect")
val obsS2 = obs.connect()
System.out.println("isUnsubscribed: ${s1.isUnsubscribed}")
Thread.sleep(10000)
}
}
}
This is what I expected:
first: 0
first: 1
first: 2
unsubscribe
connect
isUnsubscribed: false
first: 0
first: 1
...
Process finished with exit code 0
This is the actual result:
first: 0
first: 1
first: 2
unsubscribe
connect
isUnsubscribed: false
Process finished with exit code 0
I found some posts (RxJava - ConnectableObservable, disconnecting and reconnecting, https://github.com/Froussios/Intro-To-RxJava/issues/18), where people say that it is a bug, but this bug is present for quite a long time.
So the question: is it realy a bug? And if not, how can I achive such behavior?
Edited: Tested on versions 1.3.4, 1.2.10, 1.1.10, 1.0.10
Upvotes: 1
Views: 549
Reputation: 69997
It's not a bug but a property of RxJava's ConnectableObservable
: if you unsubscribe
the connection, the previously subscribed consumers will be ejected and won't receive any further events, even when reconnected.
You can achieve a similar effect to Rx.NET's behavior by using a PublishSubject
, then subscribing and unsubscribing it to the actual source:
PublishSubject subject = PublishSubject.create();
subject.subscribe(System.out::println);
Subscription s = source.subscribe(subject::onNext);
s.unsubscribe();
Subscription s2 = source.subscribe(subject::onNext);
Upvotes: 1