Reputation: 148169
I am playing around with RxJava (RxKotlin to be precise). Here I have the following Observable
s:
fun metronome(ms: Int) = observable<Int> {
var i = 0;
while (true) {
if (ms > 0) {
Thread.sleep(ms.toLong())
}
if (it.isUnsubscribed()) {
break
}
it.onNext(++i)
}
}
And I'd like to have a few of them merged and running concurrently. They ignore backpressure so the backpressure operators have to be applied to them.
Then I create
val cores = Runtime.getRuntime().availableProcessors()
val threads = Executors.newFixedThreadPool(cores)
val scheduler = Schedulers.from(threads)
And then I merge the metronome
s:
val o = Observable.merge(listOf(metronome(0),
metronome(1000).map { "---------" })
.map { it.onBackpressureBlock().subscribeOn(scheduler) })
.take(5000, TimeUnit.MILLISECONDS)
The first one is supposed to emit items incessantly. If I do so in the last 3 seconds of the run I get the following output:
...
[RxComputationThreadPool-5]: 369255
[RxComputationThreadPool-5]: 369256
[RxComputationThreadPool-5]: 369257
[RxComputationThreadPool-5]: ---------
[RxComputationThreadPool-5]: ---------
[RxComputationThreadPool-5]: ---------
Seems that the Observable
s are subscribed on the same one thread, and the first observable is blocked for 3+ seconds.
But when I swap onBackpressureBlock()
and subscribeOn(scheduler)
calls the output becomes what I expected, the output gets merged during the whole execution.
It's obvious to me that calls order matters in RxJava, but I don't quite understand what happens in this particular situation.
So what happens when onBackpressureBlock
operator is applied before subscribeOn
and what if after?
Upvotes: 0
Views: 293
Reputation: 70017
The onBackpressureBlock
operator is a failed experiment; it requires care where to apply. For example, subscribeOn().onBackpressureBlock()
works but not the other way around.
RxJava has non-blocking periodic timer called interval
so you don't need to roll your own.
Upvotes: 1