Dayo
Dayo

Reputation: 121

Schedulers.boundedElastic appears to use same Thread for processing

My understanding from looking through the API is that using Schedulers.boundedElastic() or variants like Schedulers.newBoundedElastic(3, 10, "MyThreadGroup"); or Schedulers.fromExecutor(executor) allows for processing an IO operation in more than one thread.

But a simulation with the following sample code appears to indicate a single thread/same thread is doing the work in the flatMap

Flux.range(0, 100)
                .flatMap(i -> {
                    try {
                        // IO operation
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("Mapping for " + i + " is done by thread " + Thread.currentThread().getName());
                    return Flux.just(i);
                })
                .subscribeOn(Schedulers.boundedElastic())
                .subscribe();

Thread.sleep(10000); // main thread

//This yields the following

Mapping for 0 is done by thread boundedElastic-1
Mapping for 1 is done by thread boundedElastic-1
Mapping for 2 is done by thread boundedElastic-1
Mapping for 3 is done by thread boundedElastic-1 ...

The above output suggests to me the same Thread is running within the flatMap. Is there a way to get more than one Thread to process items when the flatMap is invoked on subcribe for multiple IO? I was expecting to see boundedElastic-1, boundedElastic-2 ... .

Upvotes: 4

Views: 11015

Answers (2)

Dayo
Dayo

Reputation: 121

One way to get the flatMap running on multiple Threads is to create a ParallelFlux. The sample code below does the trick.

Flux.range(0, 1000)
                .parallel()             
                .runOn(Schedulers.boundedElastic())
                .flatMap(i -> {
                    try {
                        // IO operation
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("second Mapping for " + i + " is done by thread " + Thread.currentThread().getName());
                    return Flux.just(i);
                })
                .subscribe();
        
        Thread.sleep(10000);

Upvotes: 2

Martin Tarjányi
Martin Tarjányi

Reputation: 9947

1. Concurrency with non-blocking IO (preferred)

If you have the chance to use non-blocking IO (like Spring WebClient), then you don't need to worry about threads or schedulers and you get concurrency out of the box:

Flux.range(0, 100)
        .flatMap(i -> Mono.delay(Duration.ofMillis(500)) // e.g.: reactive webclient call
                .doOnNext(x -> System.out.println("Mapping for " + i + " is done by thread " + Thread.currentThread()
                        .getName())))
        .subscribe();

2. Concurrency with blocking IO

It's better to avoid blocking IO if you have the choice. In case you can't avoid it, you just need to make a slight modification to your code and apply subscribeOn to the inner Mono:

Flux.range(0, 100)
        .flatMap(i -> Mono.fromRunnable(() -> {
            try {
                // IO operation
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("Mapping for " + i + " is done by thread " + Thread.currentThread().getName());
        }).subscribeOn(Schedulers.boundedElastic()))
        .subscribe();

Upvotes: 8

Related Questions