hotkey
hotkey

Reputation: 148169

RxJava: onBackpressureBlock() strange behavior

I am playing around with RxJava (RxKotlin to be precise). Here I have the following Observables:

fun metronome(ms: Int) = observable<Int> {
    var i = 0;
    while (true) {
        if (ms > 0) {
            Thread.sleep(ms.toLong())
        }
        if (it.isUnsubscribed()) {
            break
        }
        it.onNext(++i)
    }
}

And I'd like to have a few of them merged and running concurrently. They ignore backpressure so the backpressure operators have to be applied to them.

Then I create

val cores = Runtime.getRuntime().availableProcessors()
val threads = Executors.newFixedThreadPool(cores)
val scheduler = Schedulers.from(threads)

And then I merge the metronomes:

val o = Observable.merge(listOf(metronome(0),
                                metronome(1000).map { "---------" })
                         .map { it.onBackpressureBlock().subscribeOn(scheduler) })
                  .take(5000, TimeUnit.MILLISECONDS)

The first one is supposed to emit items incessantly. If I do so in the last 3 seconds of the run I get the following output:

...
[RxComputationThreadPool-5]: 369255
[RxComputationThreadPool-5]: 369256
[RxComputationThreadPool-5]: 369257
[RxComputationThreadPool-5]: ---------
[RxComputationThreadPool-5]: ---------
[RxComputationThreadPool-5]: ---------

Seems that the Observables are subscribed on the same one thread, and the first observable is blocked for 3+ seconds.

But when I swap onBackpressureBlock() and subscribeOn(scheduler) calls the output becomes what I expected, the output gets merged during the whole execution.

It's obvious to me that calls order matters in RxJava, but I don't quite understand what happens in this particular situation.

So what happens when onBackpressureBlock operator is applied before subscribeOn and what if after?

Upvotes: 0

Views: 293

Answers (1)

akarnokd
akarnokd

Reputation: 70017

The onBackpressureBlock operator is a failed experiment; it requires care where to apply. For example, subscribeOn().onBackpressureBlock() works but not the other way around.

RxJava has non-blocking periodic timer called interval so you don't need to roll your own.

Upvotes: 1

Related Questions