Will Li
Will Li

Reputation: 615

How to get the actual latest event on back pressure in Rxjava2? Flowable.onBackpressureLatest() not worked as expected

When a producer produces event faster than a customer consumes.

I thought using Flowable with onBackpressureLatest(), I can get the latest event emitted.

But it turns out there's a default buffer of size 128. What I got was the dated event buffered previously.

So how can I get the actual latest event?

Here's the sample code:

Flowable.interval(40, TimeUnit.MILLISECONDS)
            .doOnNext{
                println("doOnNext $it")
            }
            .onBackpressureLatest()
            .observeOn(Schedulers.single())
            .subscribe {
                println("subscribe $it")
                Thread.sleep(100)
            }

What I expected:

doOnNext    0
    subscribe   0
doOnNext    1
doOnNext    2
    subscribe   2
doOnNext    3
doOnNext    4
doOnNext    5
    subscribe   5
doOnNext    6
doOnNext    7
    subscribe   7
doOnNext    8
doOnNext    9
doOnNext    10
    subscribe   10
...

What I got:

doOnNext    0
    subscribe   0
doOnNext    1
doOnNext    2
    subscribe   1
doOnNext    3
doOnNext    4
doOnNext    5
    subscribe   2
doOnNext    6
doOnNext    7
    subscribe   3
doOnNext    8
doOnNext    9
doOnNext    10
    subscribe   4
...
doOnNext    325
    subscribe   127
doOnNext    326
doOnNext    327
doOnNext    328
    subscribe   246
...

Upvotes: 1

Views: 371

Answers (1)

Kiskae
Kiskae

Reputation: 25603

Your problem actually lies in observeOn, which requests up to 128 items by default and passes this request up to backpressureLatest, which as a consequence does not behave as you're expecting.

You can use .observeOn(Scheduler,boolean,int) to specify the buffer size which should fix the behavior you're seeing.

Upvotes: 6

Related Questions