Reputation: 6487
The following function:
private Boolean canDoIt(Parameter param) {
return myService
.getMyObjectInReactiveWay(param)
.map(myObject -> myService.checkMyObjectInImperativeWay(myObject))
.block();
}
is working fine at runtime, but when testing a flow which uses it using WebTestClient
I get the following error:
java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread parallel-1
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:83) ~[reactor-core-3.4.1.jar:3.4.1]
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly trace from producer [reactor.core.publisher.MonoFlatMap] :
reactor.core.publisher.Mono.flatMap
I know I am not supposed to use block()
but I have no other choice: the function has to return a Boolean
(not a Mono<Boolean>
). Perhaps there is an alternative way to write it which doesn't use block()
.
Is there a way I can make WebTestClient
not throw that error?
Using Reactor Core version 3.4.6
.
Upvotes: 8
Views: 11699
Reputation: 15930
Decrypting amanin's answer, this is her/his TL;DR:
Scheduler scheduler = Schedulers.boundedElastic();
Mono
.fromCallable(myMono::block)
.subscribeOn(scheduler)
.subscribe(val -> {});
Where myMono
is the Mono
to block.
It works because the default scheduler is Schedulers.parallel()
, that's a multiprocessing (workers) scheduler, while Schedulers.boundedElastic()
is a multithread scheduler.
Upvotes: 2
Reputation: 4139
I validate my comment. block()
checks if the calling thread is compatible with blocking code (thread external to reactor, or a thread of a specific reactor scheduler like Schedulers.boundedElastic()
).
There's 2 ways to handle blocking calls in the middle of a reactive stack:
block()
call to be executed on a blocking compatible scheduler using scheduleOn
or publishOn
. Beware, this operators should not be called on the publisher that directly call block()
, but on the publisher that will "wrap" the block call (see example below).Some references:
And a minimal reproducible example (tested on v3.4.6) giving this output:
Ok context: not running from reactor Threads
value is true
Problematic stack: working with scheduler not compatible with blocking call
ERROR: block()/blockFirst()/blockLast() are blocking, which is not supported in thread parallel-2
Bad way to subscribe on a blocking compatible scheduler
ERROR: block()/blockFirst()/blockLast() are blocking, which is not supported in thread parallel-4
Bad way to publish on blocking compatible scheduler
ERROR: block()/blockFirst()/blockLast() are blocking, which is not supported in thread parallel-6
Possible workaround: share the reactive stream before blocking on it
It worked
Right way to subscribe on blocking compatible scheduler
It worked
Right way to publish on blocking compatible scheduler
It worked
Here comes the code:
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import java.time.Duration;
import java.util.concurrent.Callable;
import java.util.function.Supplier;
public class BlockingWorkaround {
public static void main(String[] args) throws Exception {
System.out.println("Ok context: not running from reactor Threads");
System.out.println("value is "+blockingFunction());
System.out.println("Problematic stack: working with scheduler not compatible with blocking call");
executeAndWait(() -> blockingFunction());
System.out.println("Bad way to subscribe on a blocking compatible scheduler");
executeAndWait(() -> blockingFunctionUsingSubscribeOn());
System.out.println("Bad way to publish on blocking compatible scheduler");
executeAndWait(() -> blockingFunctionUsingPublishOn());
System.out.println("Possible workaround: share the reactive stream before blocking on it");
executeAndWait(() -> blockingFunctionShared());
System.out.println("Right way to subscribe on blocking compatible scheduler");
subscribeOnAndWait(() -> blockingFunction());
System.out.println("Right way to publish on blocking compatible scheduler");
publishOnAndWait(() -> blockingFunction());
}
static Boolean blockingFunction() {
return delay()
.flatMap(delay -> Mono.just(true))
.block();
}
static Boolean blockingFunctionShared() {
return delay()
.flatMap(delay -> Mono.just(true))
.share() // Mono result is cached internally
.block();
}
static Boolean blockingFunctionUsingSubscribeOn() {
return delay()
.subscribeOn(Schedulers.boundedElastic())
.flatMap(delay -> Mono.just(true))
.block();
}
static Boolean blockingFunctionUsingPublishOn() {
return delay()
.flatMap(delay -> Mono.just(true))
.publishOn(Schedulers.boundedElastic())
.block();
}
static Mono<Long> delay() {
return Mono.delay(Duration.ofMillis(10));
}
private static void executeAndWait(Supplier<Boolean> blockingAction) throws InterruptedException {
delay()
.map(it -> blockingAction.get())
.subscribe(
val -> System.out.println("It worked"),
err -> System.out.println("ERROR: " + err.getMessage())
);
Thread.sleep(100);
}
private static void subscribeOnAndWait(Callable<Boolean> blockingAction) throws InterruptedException {
final Mono<Boolean> blockingMono = Mono.fromCallable(blockingAction)
.subscribeOn(Schedulers.boundedElastic()); // Upstream is executed on given scheduler
delay()
.flatMap(it -> blockingMono)
.subscribe(
val -> System.out.println("It worked"),
err -> System.out.println("ERROR: " + err.getMessage())
);
Thread.sleep(100);
}
private static void publishOnAndWait(Supplier<Boolean> blockingAction) throws InterruptedException {
delay()
.publishOn(Schedulers.boundedElastic()) // Cause downstream to be executed on given scheduler
.map(it -> blockingAction.get())
.subscribe(
val -> System.out.println("It worked"),
err -> System.out.println("ERROR: " + err.getMessage())
);
Thread.sleep(100);
}
}
Upvotes: 4