Reputation: 53
Below is my code snippet.
I know you are not supposed to block cachedFlowable
like this, but this is just an example.
It gets stuck at the blockingGet
line.
If I replace singleOrError
with singleElement
, the code will still get stuck. If I replace singleOrError
with firstElement
, the code will no longer get stuck.
Can someone please explain to me why this is the case?
public static void main(String[] args) {
final Flowable<Integer> cachedFlowable = Flowable.just(1).cache();
cachedFlowable
.doOnNext(i -> {
System.out.println("doOnNext " + i);
final Integer j = cachedFlowable.singleOrError().blockingGet();
System.out.println("after blockingGet " + j);
})
.blockingSubscribe();
}
Upvotes: 1
Views: 339
Reputation: 69997
The reason it deadlocks with singleX
operator is that such operators wait for a possible 2nd item emission but since you are blocking them, any second item or completion from the main source can't get executed. With firstX
they only care about the very first item thus unblock almost immediately which allows the source to complete.
So yes, you should not use blocking methods in flows like that but instead use flatMap
or concatMap
to do a per item subflow:
var cache = Flowable.just(1).cache();
cache
.doOnNext(i -> System.out.println("doOnNext " + i))
.concatMapSingle(item -> cache.firstOrError())
.doOnNext(j -> System.out.println("after " + j))
.blockingSubscribe();
Upvotes: 2