DinDoit
DinDoit

Reputation: 11

Does subscribeOn for combineLatest do something?

Is there any sense in adding subscribeOn() after Observable.combineLatest() like so:

Observable.combineLatest(
  someObservable,
  theOtherObservable,
  (something, theOther) -> iAmFunction(something, theOther)))
  .subscribeOn(Schedulers.computation())
  ...

What I understand iAmFunction() will be called on whatever Scheduler the being-combined-Observable emmit as last.

So what's the purpose of that subscribeOn() at the end?

Upvotes: 1

Views: 1211

Answers (2)

akarnokd
akarnokd

Reputation: 70007

subscribeOn specifies where the subscription side-effects will happen and does not guarantee you get items on the thread it uses. Use observeOn instead:

Observable.combineLatest(
    someObservable.observeOn(Schedulers.single()),
    theOtherObservable.observeOn(Schedulers.single()),
    (something, theOther) -> iAmFunction(something, theOther))
)

What combineLatest does upon subscription is to subscribe to its sources, thus transitively you subscribe to someObservable and theOtherObservable on the computation() Scheduler's thread. Note, however, that if someObservable does not give control back, theOtherObservable will not get subscribed to. It is always better to specify subscribeOn as close to the source(s) as possible:

Observable.combineLatest(
    someObservable.subscribeOn(Schedulers.computation()),
    theOtherObservable.subscribeOn(Schedulers.computation()),
    (something, theOther) -> iAmFunction(something, theOther))
)

Upvotes: 4

Kh Jung
Kh Jung

Reputation: 311

In this question, the subscribeOn decides which thread executes the iAmFunction() (Link). Basically subscribeOn decides emitting thread.

the SubscribeOn operator designates which thread the Observable will begin operating on, no matter at what point in the chain of operators that operator is called.

For example there are two combineLatest logics.

fun main(args: Array<String>) {

    Observables.combineLatest(
        Observable.just("o1"),
        Observable.just("o2")
    ) { _, _ -> Thread.currentThread().name }
        .subscribe {
            println("Without subscribeOn")
            println("in combineLatest: $it")
            println("in subscribe: ${Thread.currentThread().name}")
        }

    println()

    Observables.combineLatest(
        Observable.just("o1"),
        Observable.just("o2")
    ) { _, _ -> Thread.currentThread().name }
        .subscribeOn(Schedulers.io())
        .subscribe {
            println("With subscribeOn")
            println("in combineLatest: $it")
            println("in subscribe: ${Thread.currentThread().name}")

        }

    Thread.sleep(500)
}
Without subscribeOn
in combineLatest: main
in subscribe: main

With subscribeOn
in combineLatest: RxCachedThreadScheduler-1
in subscribe: RxComputationThreadPool-1

As you can see subscribeOn changes the thread for combineLatest and subscribe. If you don't use observeOn, it can effect from emitting items to subscribe.

Upvotes: 0

Related Questions