Reputation: 832
I'm having a hard time understanding the behaviour of the following code sample;
Flowable<String> f = Flowable.just(1)
.flatMap(it -> Flowable.create(e -> {
for(int i = 1; i < 1001; ++i) {
log.info("Emitting: " + i);
if(i % 10 == 0) {
Thread.sleep(1000);
}
e.onNext(i);
}
e.onComplete();
}, BackpressureStrategy.BUFFER))
.map(String::valueOf)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread());
f.subscribe(val -> {
Thread.sleep(100);
log.info("Observing: " + val);
});
Thread.sleep(1000000);
The code works OK until 128 items are observed by the subscribe
call. Emit and observe are in parallel. But after that, Flowable continues to emit items (which are queued somewhere obviously) but no item is observed until all 1000 of items are emitted. After all 1000 items are emitted, then the rest of items (> 128) are observed at once.
This looks to be related to backpressure bufferSize of 128 but still I would expect the emit and observe to be in parallel for the whole 1000 items, because observer is obviously not slower than the emitter. Is there something I'm missing here? What should I do to fix the code?
Upvotes: 5
Views: 1065
Reputation: 70017
This is due to a same pool deadlock between create and subscribeOn:
If there is a
create(FlowableOnSubscribe, BackpressureStrategy)
type source up in the chain, it is recommended to usesubscribeOn(scheduler, false)
instead to avoid same-pool deadlock because requests may pile up behind a eager/blocking emitter.
//...
.subscribeOn(Schedulers.io(), false)
//...
Edit:
I tried the original example (plus your suggested fix) by replacing Flowable.create with a Flowable.range but I didn't encounter a problem. Can you give an example when problems may occur?
Flowable.range(1, 10)
.subscribeOn(Schedulers.io(), false)
.doOnNext(v -> System.out.println(Thread.currentThread().getName()))
.observeOn(Schedulers.single(), false, 1)
.blockingSubscribe();
This initially prints RxCachedThreadScheduler-1
then RxSingleScheduler-1
9 times because observeOn
's replenishment request will run on the single scheduler instead of routed back to the io scheduler. Try this with subscribeOn true.
Upvotes: 5