Vlad Kudoyar
Vlad Kudoyar

Reputation: 451

RxJava reconnection on connectable observable does not work

Hi, guys. I'm trying to make some kind of MVC with RxJava. So the idea is to make some constant subscriptions that will be always subscribed to some observable. Also this observable can be restarted at any time to rerun, for example a network call. I tried this code to test this feature:

class Main {
    companion object {
        @JvmStatic
        fun main(args: Array<String>) {
            val obs = Observable.interval(1, TimeUnit.SECONDS, Schedulers.io())
                .publish()

            val s1 = obs
                .doOnUnsubscribe { System.out.println("s1 unsubscribed") }
                .subscribe { System.out.println("first: $it") }

            val s = obs.connect()

            Thread.sleep(4000)

            System.out.println("unsubscribe")
            s.unsubscribe()

            Thread.sleep(1000)

            System.out.println("connect")
            val obsS2 = obs.connect()

            System.out.println("isUnsubscribed: ${s1.isUnsubscribed}")

            Thread.sleep(10000)
        }
    }
}

This is what I expected:

first: 0
first: 1
first: 2
unsubscribe
connect
isUnsubscribed: false
first: 0
first: 1
...
Process finished with exit code 0

This is the actual result:

first: 0
first: 1
first: 2
unsubscribe
connect
isUnsubscribed: false

Process finished with exit code 0

I found some posts (RxJava - ConnectableObservable, disconnecting and reconnecting, https://github.com/Froussios/Intro-To-RxJava/issues/18), where people say that it is a bug, but this bug is present for quite a long time.

So the question: is it realy a bug? And if not, how can I achive such behavior?

Edited: Tested on versions 1.3.4, 1.2.10, 1.1.10, 1.0.10

Upvotes: 1

Views: 549

Answers (1)

akarnokd
akarnokd

Reputation: 69997

It's not a bug but a property of RxJava's ConnectableObservable: if you unsubscribe the connection, the previously subscribed consumers will be ejected and won't receive any further events, even when reconnected.

You can achieve a similar effect to Rx.NET's behavior by using a PublishSubject, then subscribing and unsubscribing it to the actual source:

PublishSubject subject = PublishSubject.create();

subject.subscribe(System.out::println);

Subscription s = source.subscribe(subject::onNext);

s.unsubscribe();

Subscription s2 = source.subscribe(subject::onNext);

Upvotes: 1

Related Questions