ctranxuan
ctranxuan

Reputation: 815

RxJava share() operator: slow subscribers affect fast subscribers

I have a Flowable shared between a subscriber, that consumes slowly the emitted items, and a subscriber, that consumes them fast.

Flowable<Long> sharedFlowable =
            Flowable.generate(() -> 0L, (state, emitter) -> {
                emitter.onNext(state);
                return state + 1;
            })
            .share()
            .cast(Long.class);

    sharedFlowable
            .observeOn(computation())
            .subscribeWith(new PaceSubscriber<>("arthur", 1000, 1));

    sharedFlowable
            .observeOn(computation())
            .subscribeWith(new PaceSubscriber<>("ford", 1, 1));

Basically, PaceSubscriber performs a Thread.sleep(.). The duration of the sleep is the 2nd argument of the constructor. The 3rd one is the number of elements the subscriber is requesting via a call to the request method request(.)

What I observe is the following phenomenon:

For instance, with the above code, I got that kind of trace:

2018-04-03 13:39:44 [INFO ] [RxComputationThreadPool-2] INFO ford ⇶ received onNext(): 0
2018-04-03 13:39:44 [INFO ] [RxComputationThreadPool-2] INFO ford ⇶ received onNext(): 1
...
2018-04-03 13:39:44 [INFO ] [RxComputationThreadPool-2] INFO ford ⇶ received onNext(): 255
2018-04-03 13:39:47 [INFO ] [RxComputationThreadPool-3] INFO arthur ⇶ received onNext(): 130
2018-04-03 13:39:48 [INFO ] [RxComputationThreadPool-3] INFO arthur ⇶ received onNext(): 131
...
2018-04-03 13:41:21 [INFO ] [RxComputationThreadPool-3] INFO arthur ⇶ received onNext(): 224
2018-04-03 13:41:21 [INFO ] [RxComputationThreadPool-2] INFO ford ⇶ received onNext(): 257
2018-04-03 13:41:21 [INFO ] [RxComputationThreadPool-2] INFO ford ⇶ received onNext(): 258

Actually, I expected that each of the subscribers would process the incoming items at their own pace without any interaction between them. Especially, I wouldn't have expected the slowest subscriber would slow the fastest. I was wondering whether this was an issue or an expected behavior. I suspect something related to the fact the observeOn() manages a queue of received items and something related to some backpressure support but I would be glad if I could have some insightful explanations about that.

Full code can be found there

Many thanks in advance!

Upvotes: 0

Views: 250

Answers (1)

akarnokd
akarnokd

Reputation: 70017

Actually, I expected that each of the subscribers would process the incoming items at their own pace without any interaction between them.

This is, by design, the behavior of publish which share uses under the hood. Consumers are fed in a lockstep fashion so that no excess buffering has to happen by default due to any potential speed differences in the consumption.

You have to manually unbound the slower flow via onBackpressureBuffer for example, but that will likely cause excess memory usage.

This behavior is due to backpressure effects (the slowest path is not ready to receive items) partly to avoid dataloss in complicated timing situations.

Upvotes: 2

Related Questions