Courageous Dilemma
Courageous Dilemma

Reputation: 53

RxJava Flowable cache to Single deadlock

Below is my code snippet.

I know you are not supposed to block cachedFlowable like this, but this is just an example.

It gets stuck at the blockingGet line.

If I replace singleOrError with singleElement, the code will still get stuck. If I replace singleOrError with firstElement, the code will no longer get stuck.

Can someone please explain to me why this is the case?

    public static void main(String[] args) {
        final Flowable<Integer> cachedFlowable = Flowable.just(1).cache();
        cachedFlowable
                .doOnNext(i -> {
                    System.out.println("doOnNext " + i);
                    final Integer j = cachedFlowable.singleOrError().blockingGet();
                    System.out.println("after blockingGet " + j);
                })
                .blockingSubscribe();
    }

Upvotes: 1

Views: 339

Answers (1)

akarnokd
akarnokd

Reputation: 69997

The reason it deadlocks with singleX operator is that such operators wait for a possible 2nd item emission but since you are blocking them, any second item or completion from the main source can't get executed. With firstX they only care about the very first item thus unblock almost immediately which allows the source to complete.

So yes, you should not use blocking methods in flows like that but instead use flatMap or concatMap to do a per item subflow:

var cache = Flowable.just(1).cache();

cache
.doOnNext(i -> System.out.println("doOnNext " + i))
.concatMapSingle(item -> cache.firstOrError())
.doOnNext(j -> System.out.println("after " + j))
.blockingSubscribe();

Upvotes: 2

Related Questions