Reputation: 688
I am having an issue using RxJava backpressure. Basically, I have one producer that produces more items than the consumer can handle and want to have some buffer queue to handle only items that I can deal with, and request when I complete some of them, like in this example:
object Tester extends App {
Observable[Int] { subscriber =>
(1 to 100).foreach { e =>
subscriber.onNext(e)
Thread.sleep(100)
println("produced " + e + "(" + Thread.currentThread().getName + Thread.currentThread().getId + ")")
}
}
.subscribeOn(NewThreadScheduler())
.observeOn(ComputationScheduler())
.subscribe(
new Subscriber[Int]() {
override def onStart(): Unit = {
request(2)
}
override def onNext(value: Int): Unit = {
Thread.sleep(1000)
println("consumed " + value + "(" + Thread.currentThread().getName + Thread.currentThread().getId + ")")
request(1)
}
override def onCompleted(): Unit = {
println("finished ")
}
})
Thread.sleep(100000)
I expect to get output like
produced 1(RxNewThreadScheduler-113)
consumed 1(RxComputationThreadPool-312)
produced 2(RxNewThreadScheduler-113)
consumed 2(RxComputationThreadPool-312)
produced 3(RxNewThreadScheduler-113)
consumed 3(RxComputationThreadPool-312)
......
but instead, I get
produced 1(RxNewThreadScheduler-113)
produced 2(RxNewThreadScheduler-113)
produced 3(RxNewThreadScheduler-113)
produced 4(RxNewThreadScheduler-113)
produced 5(RxNewThreadScheduler-113)
produced 6(RxNewThreadScheduler-113)
produced 7(RxNewThreadScheduler-113)
produced 8(RxNewThreadScheduler-113)
produced 9(RxNewThreadScheduler-113)
consumed 1(RxComputationThreadPool-312)
produced 10(RxNewThreadScheduler-113)
produced 11(RxNewThreadScheduler-113)
produced 12(RxNewThreadScheduler-113)
produced 13(RxNewThreadScheduler-113)
.....
Upvotes: 1
Views: 475
Reputation: 28301
When you implement your Observable
using Observable.create
it is up to you to manage backpressure (which is not a simple task). Here your observable simply ignores reactive pull requests (you just iterate, not waiting for a request to call the iterator's next()
method).
If possible, try to use Observable
factory methods like range
, etc... and composing using map
/flatMap
to obtain the desired source Observable, as those will respect backpressure.
Otherwise, have a look at the experimental utility classes introduced recently for correctly managing backpressure in a OnSubscribe
implementation: AsyncOnSubscribe
and SyncOnSubscribe
.
Here is a quite naïve example:
Observable<Integer> backpressuredObservable =
Observable.create(SyncOnSubscribe.createStateful(
() -> 0, //starts the state at 0
(state, obs) -> {
int i = state++; //first i is 1 as desired
obs.next(i);
if (i == 100) { //maximum is 100, stop there
obs.onCompleted();
}
return i; //update the state
}));
Upvotes: 1