Hadi
Hadi

Reputation: 574

Why doesn't subscribeOn effect on PublishSubject in Rxjava?

This is my test code in Kotlin:

fun main() {
    rxjava()
}

fun rxjava() {
    val queuSubject = PublishSubject.create<String>()
    queuSubject
        .map { t ->
            val a = t.toLong()
            Thread.sleep(6000 / a)
            println("map $a called ${Thread.currentThread().name} ")
            a
        }
        .subscribeOn(Schedulers.io())
        .observeOn(Schedulers.io())
        .subscribe({
            println("thread in subscription ${Thread.currentThread().name}")
        }, {
            println("error ${it.message}")
        })
    for (i in 1..3) {
        Thread {
            queuSubject.onNext("$i")
        }.start()
    }
    Thread.sleep(15000)
}

I'm trying to run map block and subscribe's onNext block in different IO threads. But the output is like this:

map 3 called Thread-2 
thread in subscription RxCachedThreadScheduler-2
map 2 called Thread-1 
thread in subscription RxCachedThreadScheduler-2
map 1 called Thread-0 
thread in subscription RxCachedThreadScheduler-2

As you can see It seems that calling subscribeOn has no effect on PublishSubject's stream and thread-0,thread-1 and thread-2 refers to the threads that call onNext methods.

Additionally consider the code below:

fun main() {
    rxjava()
}

fun rxjava() {
    val queuSubject = PublishSubject.create<String>()
    queuSubject
        .map { t ->
            val a = t.toLong()
            Thread.sleep(6000 / a)
            println("map $a called ${Thread.currentThread().name} ")
            a
        }
        .subscribeOn(Schedulers.io())
        .observeOn(Schedulers.io())
        .subscribe({
            println("thread in subscription ${Thread.currentThread().name}")
        }, {
            println("error ${it.message}")
        })
    queuSubject.onNext("1")
    queuSubject.onNext("2")
    queuSubject.onNext("3")
    Thread.sleep(15000)
}

I wrote the code above and saw that no output is printed. But If I remove subscribeOn from the stream, messages are printed sequentially like the following:

map 1 called main 
thread in subscription RxCachedThreadScheduler-1
map 2 called main 
thread in subscription RxCachedThreadScheduler-1
map 3 called main 
thread in subscription RxCachedThreadScheduler-1

What is the problem of these codes? Thanks.

Upvotes: 1

Views: 520

Answers (1)

akarnokd
akarnokd

Reputation: 70017

Because subscribeOn only affects subscription side-effects of a source. Such side-effect would be if the source starts emitting events right when an observer subscribes:

Observable.just(1, 2, 3)
.subscribeOn(Schedulers.io())
.doOnNext(v -> System.out.println(Thread.currentThread() + " - " + v)
.blockingSubscribe();

PublishSubject has no subscription side-effect as it only relays signals from its onXXX method to the observers' onXXX methods.

However, subscribeOn has a time effect as it delays the actual subscription to the source, thus in case of PublishSubject, it might not see a registered observer in time some other thread calls its onXXX methods.

If you want to move the processing off of the original thread, use observeOn:

val queuSubject = PublishSubject.create<String>()
    queuSubject
        .observeOn(Schedulers.io()) // <----------------------------------------
        .map { t ->
            val a = t.toLong()
            Thread.sleep(6000 / a)
            println("map $a called ${Thread.currentThread().name} ")
            a
        }
        .observeOn(Schedulers.io())
        .subscribe({
            println("thread in subscription ${Thread.currentThread().name}")
        }, {
            println("error ${it.message}")
        })

Upvotes: 3

Related Questions