Reputation: 121
My understanding from looking through the API is that using Schedulers.boundedElastic() or variants like Schedulers.newBoundedElastic(3, 10, "MyThreadGroup"); or Schedulers.fromExecutor(executor) allows for processing an IO operation in more than one thread.
But a simulation with the following sample code appears to indicate a single thread/same thread is doing the work in the flatMap
Flux.range(0, 100)
.flatMap(i -> {
try {
// IO operation
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Mapping for " + i + " is done by thread " + Thread.currentThread().getName());
return Flux.just(i);
})
.subscribeOn(Schedulers.boundedElastic())
.subscribe();
Thread.sleep(10000); // main thread
//This yields the following
Mapping for 0 is done by thread boundedElastic-1
Mapping for 1 is done by thread boundedElastic-1
Mapping for 2 is done by thread boundedElastic-1
Mapping for 3 is done by thread boundedElastic-1 ...
The above output suggests to me the same Thread is running within the flatMap. Is there a way to get more than one Thread to process items when the flatMap is invoked on subcribe for multiple IO? I was expecting to see boundedElastic-1, boundedElastic-2 ... .
Upvotes: 4
Views: 11015
Reputation: 121
One way to get the flatMap running on multiple Threads is to create a ParallelFlux. The sample code below does the trick.
Flux.range(0, 1000)
.parallel()
.runOn(Schedulers.boundedElastic())
.flatMap(i -> {
try {
// IO operation
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("second Mapping for " + i + " is done by thread " + Thread.currentThread().getName());
return Flux.just(i);
})
.subscribe();
Thread.sleep(10000);
Upvotes: 2
Reputation: 9947
1. Concurrency with non-blocking IO (preferred)
If you have the chance to use non-blocking IO (like Spring WebClient), then you don't need to worry about threads or schedulers and you get concurrency out of the box:
Flux.range(0, 100)
.flatMap(i -> Mono.delay(Duration.ofMillis(500)) // e.g.: reactive webclient call
.doOnNext(x -> System.out.println("Mapping for " + i + " is done by thread " + Thread.currentThread()
.getName())))
.subscribe();
2. Concurrency with blocking IO
It's better to avoid blocking IO if you have the choice. In case you can't avoid it, you just need to make a slight modification to your code and apply subscribeOn
to the inner Mono
:
Flux.range(0, 100)
.flatMap(i -> Mono.fromRunnable(() -> {
try {
// IO operation
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Mapping for " + i + " is done by thread " + Thread.currentThread().getName());
}).subscribeOn(Schedulers.boundedElastic()))
.subscribe();
Upvotes: 8