Mario Zsilak
Mario Zsilak

Reputation: 43

Java Reactor: Is there a way to transform Flux<Mono<T>> into Flux<T> without eager fetching?

I have a fast, but expensive producer (Spring WebClient) and a very slow subscriber. I need a way to honor backpressure throughout the chain.

During implementation I realized that flatMap, concatMap and others use eager fetching and there seems to be no possibility to disable this behavior.

Using demand in the subscriber without flatMap

Flux.defer(() -> Flux.range(1, 1000))
            .doOnRequest(i -> System.out.println("Requested: " + i))
            .doOnNext(v -> System.out.println("Emitted:   " + v))
            //.flatMap(Mono::just)
            .subscribe(new BaseSubscriber<Object>() {
                protected void hookOnSubscribe(final Subscription subscription) {
                    subscription.request(3);
                }

                protected void hookOnNext(final Object value) {
                    System.out.println("Received:  " + value);
                }
            });

.. produces:

Requested: 3
Emitted:   1
Received:  1
Emitted:   2
Received:  2
Emitted:   3
Received:  3

Using same demand with flatMap (uncommented) produces:

Requested: 256
Emitted:   1
Received:  1
Emitted:   2
Received:  2
Emitted:   3
Received:  3
Emitted:   4
Emitted:   5
...
Emitted:   254
Emitted:   255
Emitted:   256

Upvotes: 1

Views: 1332

Answers (1)

Mario Zsilak
Mario Zsilak

Reputation: 43

It seems there is an open issue for this: https://github.com/reactor/reactor-core/issues/1397

Anyway, I found a solution for my situation: block(). Keep in mind that this operation is only allowed on Threads which are not marked as "non-blocking operations only". (See also Project Blockhound)

To recap, the issue is that at some point I have a Flux<Mono<T>> and .flatMap(...), .concatMap(...), etc. use some kind of eager fetching. The Flux<Mono<T>> used for testing:

final Flux<Mono<Integer>> monoFlux = Flux.<Mono<Integer>, Integer>generate(
() -> 0, 
(state, sink) -> {
    state += 1;
    sink.next(Mono.just(state));
    return state;
}).doOnRequest(i -> System.out.println("Requested: " + i))
  .doOnNext(v -> System.out.println("Emitted:   " + v));

In order to not have eager fetching, I now do a block inside a map and it works surprisingly well:

monoFlux.map(Mono::block)
        .subscribe(new MySubscriber<>());

Result:

Requested: 3
Emitted:   MonoJust
Received:  1
Emitted:   MonoJust
Received:  2
Emitted:   MonoJust
Received:  3

Upvotes: 1

Related Questions