proximator
proximator

Reputation: 688

RxJava/RxScala backpressure using request

I am having an issue using RxJava backpressure. Basically, I have one producer that produces more items than the consumer can handle and want to have some buffer queue to handle only items that I can deal with, and request when I complete some of them, like in this example:

object Tester extends App {

Observable[Int] { subscriber =>
  (1 to 100).foreach { e =>
    subscriber.onNext(e)
    Thread.sleep(100)
    println("produced " + e + "(" + Thread.currentThread().getName + Thread.currentThread().getId + ")")
  }
}
.subscribeOn(NewThreadScheduler())
.observeOn(ComputationScheduler())
.subscribe(
  new Subscriber[Int]() {
    override def onStart(): Unit = {
      request(2)
    }

    override def onNext(value: Int): Unit = {
      Thread.sleep(1000)
      println("consumed " + value + "(" + Thread.currentThread().getName + Thread.currentThread().getId + ")")
      request(1)
    }

    override def onCompleted(): Unit = {
      println("finished ")
    }
})

Thread.sleep(100000)

I expect to get output like

produced 1(RxNewThreadScheduler-113)
consumed 1(RxComputationThreadPool-312)
produced 2(RxNewThreadScheduler-113)
consumed 2(RxComputationThreadPool-312)
produced 3(RxNewThreadScheduler-113)
consumed 3(RxComputationThreadPool-312)
......

but instead, I get

produced 1(RxNewThreadScheduler-113)
produced 2(RxNewThreadScheduler-113)
produced 3(RxNewThreadScheduler-113)
produced 4(RxNewThreadScheduler-113)
produced 5(RxNewThreadScheduler-113)
produced 6(RxNewThreadScheduler-113)
produced 7(RxNewThreadScheduler-113)
produced 8(RxNewThreadScheduler-113)
produced 9(RxNewThreadScheduler-113)
consumed 1(RxComputationThreadPool-312)
produced 10(RxNewThreadScheduler-113)
produced 11(RxNewThreadScheduler-113)
produced 12(RxNewThreadScheduler-113)
produced 13(RxNewThreadScheduler-113)
.....

Upvotes: 1

Views: 475

Answers (1)

Simon Baslé
Simon Baslé

Reputation: 28301

When you implement your Observable using Observable.create it is up to you to manage backpressure (which is not a simple task). Here your observable simply ignores reactive pull requests (you just iterate, not waiting for a request to call the iterator's next() method).

If possible, try to use Observable factory methods like range, etc... and composing using map/flatMap to obtain the desired source Observable, as those will respect backpressure.

Otherwise, have a look at the experimental utility classes introduced recently for correctly managing backpressure in a OnSubscribe implementation: AsyncOnSubscribe and SyncOnSubscribe.

Here is a quite naïve example:

Observable<Integer> backpressuredObservable = 
  Observable.create(SyncOnSubscribe.createStateful(
    () -> 0, //starts the state at 0
    (state, obs) -> {
        int i = state++; //first i is 1 as desired
        obs.next(i);
        if (i == 100) { //maximum is 100, stop there
            obs.onCompleted();
        }
        return i; //update the state
}));

Upvotes: 1

Related Questions