Reputation: 11
Is there any sense in adding subscribeOn()
after Observable.combineLatest()
like so:
Observable.combineLatest(
someObservable,
theOtherObservable,
(something, theOther) -> iAmFunction(something, theOther)))
.subscribeOn(Schedulers.computation())
...
What I understand iAmFunction()
will be called on whatever Scheduler the being-combined-Observable emmit as last.
So what's the purpose of that subscribeOn()
at the end?
Upvotes: 1
Views: 1211
Reputation: 70007
subscribeOn
specifies where the subscription side-effects will happen and does not guarantee you get items on the thread it uses. Use observeOn
instead:
Observable.combineLatest(
someObservable.observeOn(Schedulers.single()),
theOtherObservable.observeOn(Schedulers.single()),
(something, theOther) -> iAmFunction(something, theOther))
)
What combineLatest
does upon subscription is to subscribe to its sources, thus transitively you subscribe to someObservable
and theOtherObservable
on the computation()
Scheduler's thread. Note, however, that if someObservable
does not give control back, theOtherObservable
will not get subscribed to. It is always better to specify subscribeOn
as close to the source(s) as possible:
Observable.combineLatest(
someObservable.subscribeOn(Schedulers.computation()),
theOtherObservable.subscribeOn(Schedulers.computation()),
(something, theOther) -> iAmFunction(something, theOther))
)
Upvotes: 4
Reputation: 311
In this question, the subscribeOn
decides which thread executes the iAmFunction()
(Link). Basically subscribeOn
decides emitting thread.
the SubscribeOn operator designates which thread the Observable will begin operating on, no matter at what point in the chain of operators that operator is called.
For example there are two combineLatest logics.
fun main(args: Array<String>) {
Observables.combineLatest(
Observable.just("o1"),
Observable.just("o2")
) { _, _ -> Thread.currentThread().name }
.subscribe {
println("Without subscribeOn")
println("in combineLatest: $it")
println("in subscribe: ${Thread.currentThread().name}")
}
println()
Observables.combineLatest(
Observable.just("o1"),
Observable.just("o2")
) { _, _ -> Thread.currentThread().name }
.subscribeOn(Schedulers.io())
.subscribe {
println("With subscribeOn")
println("in combineLatest: $it")
println("in subscribe: ${Thread.currentThread().name}")
}
Thread.sleep(500)
}
Without subscribeOn
in combineLatest: main
in subscribe: main
With subscribeOn
in combineLatest: RxCachedThreadScheduler-1
in subscribe: RxComputationThreadPool-1
As you can see subscribeOn
changes the thread for combineLatest
and subscribe
. If you don't use observeOn
, it can effect from emitting items
to subscribe
.
Upvotes: 0