Reputation: 4512
I have a PublishSubject
which emits location updates (LatLng
):
val location = PublishSubject.create<LatLng>()
Now, I want to do something with this location, that may take some time and should be done sequentially:
location
.observeOn(Schedulers.computation())
.subscribeOn(Schedulers.io())
.subscribe {
// ... CPU-heavy operation
}
Only after each operation in the subscribe
is complete, the next can run. Moreover, each update makes the previous one obsolete, so I'm only interested in the latest value. So, the subscriber should receive only the currently latest value at each update. Therefore, I thought of applying backpressure:
location
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.toFlowable(BackpressureStrategy.LATEST)
.subscribeWith(object : DisposableSubscriber<LatLng>() {
override fun onStart() {
request(1)
}
override fun onError(t: Throwable) {
// ...
}
override fun onComplete() {
// ...
}
override fun onNext(t: LatLng) {
// ... CPU-heavy task
request(1)
}
})
Unfortunately, this doesn't work, as each LatLng
that was emitted is delivered to the subscriber and no values are skipped, as they should be.
Upvotes: 0
Views: 172
Reputation: 69997
The problem is that observeOn
always can buffer at least one element. You can accomplish the desired effect via delay
though:
location
.toFlowable(BackpressureStrategy.LATEST)
.delay(0, TimeUnit.SECONDS, Schedulers.computation())
.subscribeWith(object : DisposableSubscriber<LatLng>() {
override fun onStart() {
request(1)
}
override fun onError(t: Throwable) {
// ...
}
override fun onComplete() {
// ...
}
override fun onNext(t: LatLng) {
// ... CPU-heavy task
request(1)
}
})
Additional remarks:
subscribeOn
has no practical effect with a Subject
and can be ignoredtoFlowable
is best applied close to the source to avoid flooding delay
with items unnecessarily.Upvotes: 1