Reputation: 615
When a producer produces event faster than a customer consumes.
I thought using Flowable with onBackpressureLatest(), I can get the latest event emitted.
But it turns out there's a default buffer of size 128. What I got was the dated event buffered previously.
So how can I get the actual latest event?
Here's the sample code:
Flowable.interval(40, TimeUnit.MILLISECONDS)
.doOnNext{
println("doOnNext $it")
}
.onBackpressureLatest()
.observeOn(Schedulers.single())
.subscribe {
println("subscribe $it")
Thread.sleep(100)
}
What I expected:
doOnNext 0
subscribe 0
doOnNext 1
doOnNext 2
subscribe 2
doOnNext 3
doOnNext 4
doOnNext 5
subscribe 5
doOnNext 6
doOnNext 7
subscribe 7
doOnNext 8
doOnNext 9
doOnNext 10
subscribe 10
...
What I got:
doOnNext 0
subscribe 0
doOnNext 1
doOnNext 2
subscribe 1
doOnNext 3
doOnNext 4
doOnNext 5
subscribe 2
doOnNext 6
doOnNext 7
subscribe 3
doOnNext 8
doOnNext 9
doOnNext 10
subscribe 4
...
doOnNext 325
subscribe 127
doOnNext 326
doOnNext 327
doOnNext 328
subscribe 246
...
Upvotes: 1
Views: 371
Reputation: 25603
Your problem actually lies in observeOn
, which requests up to 128 items by default and passes this request up to backpressureLatest
, which as a consequence does not behave as you're expecting.
You can use .observeOn(Scheduler,boolean,int)
to specify the buffer size which should fix the behavior you're seeing.
Upvotes: 6