vinnygray
vinnygray

Reputation: 171

How do I process Flux events in parallel to each other?

I have streams of incoming events that need to be enriched, and then processed in parallel as they arrive.

I was thinking Project Reactor was made to order for the job, but in my tests all of the processing seems to be done serially.

Here is some test code:

ExecutorService executor = Executors.newFixedThreadPool(10);
System.out.println("Main thread: " + Thread.currentThread());
Flux<String> tick = Flux.interval(Duration.of(10, ChronoUnit.MILLIS))
        .map(i-> {
            System.out.println("ReactorTests.test " + Thread.currentThread());
            sleep(1000L); // simulate IO delay
            return String.format("String %d", i);
        })
        .take(3)
//    .subscribeOn(Schedulers.elastic());
//    .subscribeOn(Schedulers.newParallel("test"));
//    .subscribeOn(Schedulers.fromExecutor(executor));
;
tick.subscribe(x ->System.out.println("Subscribe thread: " + Thread.currentThread()), 
               System.out::println, 
               ()-> System.out.println("Done"));
System.out.println("DONE AND DONE");

I have tried uncommenting each of the commented lines, however in every case the output indicates that the same thread is used to process all of the events

Main thread: Thread[main,5,main]
[DEBUG] (main) Using Console logging
DONE AND DONE
ReactorTests.test Thread[parallel-1,5,main]
Subscribe thread: Thread[parallel-1,5,main]
ReactorTests.test Thread[parallel-1,5,main]
Subscribe thread: Thread[parallel-1,5,main]
ReactorTests.test Thread[parallel-1,5,main]
Subscribe thread: Thread[parallel-1,5,main]
Done

(The only difference is that without the Schedulers, they are run on the subscribe thread, whereas with any of the executors, they all run in the same thread, which is not the subscribe thread.)

What am I missing?

FYI, there is a "sleep" method:

public static void sleep(long time) {
    try {
        Thread.sleep(time);
    } catch (InterruptedException e) {
        System.out.println("Exiting");
    }
}

Upvotes: 8

Views: 7283

Answers (2)

AlbertOu
AlbertOu

Reputation: 59

  • I will use Flux.range(1, 100) as the source of event in this demo;
  • change map() to flatMap() with a concurrency 3;
  • inside flatMap(), use Mono.fromCallable() to wrap your "IO delay code";
  • make the Mono.fromCallable to "subscribeOn" a Scheduler;

here is the code:

    @Test
    void testParallelEvent(){
        System.out.println("Main thread: " + Thread.currentThread());
        Flux<String> tick = ⚠️Flux.range(1, 100)
                .⚠️flatMap(i-> ⚠️Mono.fromCallable(()->{
                    System.out.println("ReactorTests.test " + Thread.currentThread());
                    sleep(500L); // simulate IO delay
                    return String.format("String %d", i);
                    }).⚠️subscribeOn(Schedulers.boundedElastic())
                , ⚠️3) // concurrency
                 .take(6);
//    .subscribeOn(Schedulers.elastic());
//    .subscribeOn(Schedulers.newParallel("test"));
//    .subscribeOn(Schedulers.fromExecutor(executor));

        tick.subscribe(x ->System.out.println("Subscribe thread: " + Thread.currentThread() + "  --> " + x),
                System.out::println,
                ()-> System.out.println("Done"));
        System.out.println("DONE AND DONE");
        sleep(8000);
    }

and the output is:

Main thread: Thread[main,5,main]
16:35:34.034 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
ReactorTests.test Thread[boundedElastic-1,5,main]
ReactorTests.test Thread[boundedElastic-2,5,main]
DONE AND DONE
ReactorTests.test Thread[boundedElastic-3,5,main]
Subscribe thread: Thread[boundedElastic-1,5,main]  --> String 1
Subscribe thread: Thread[boundedElastic-3,5,main]  --> String 3
ReactorTests.test Thread[boundedElastic-4,5,main]
ReactorTests.test Thread[boundedElastic-5,5,main]
Subscribe thread: Thread[boundedElastic-2,5,main]  --> String 2
ReactorTests.test Thread[boundedElastic-3,5,main]
Subscribe thread: Thread[boundedElastic-4,5,main]  --> String 4
ReactorTests.test Thread[boundedElastic-2,5,main]
Subscribe thread: Thread[boundedElastic-5,5,main]  --> String 5
ReactorTests.test Thread[boundedElastic-4,5,main]
Subscribe thread: Thread[boundedElastic-3,5,main]  --> String 6
Done
Exiting
Exiting

if you need to use Flux.interval() as the event source, you must add some backpressure strategy or otherwise end up with the OverflowException:

Flux.interval(Duration.ofMillis(10))
                .onBackpressureBuffer(10) // backpressure strategy
                .flatMap(...)

here is full source code with Flux.interval & onBackpressureBuffer:

    @Test
    void testParallelWithBackpressureBuffer(){
        System.out.println("Main thread: " + Thread.currentThread());
        Flux<String> tick = Flux.interval(Duration.ofMillis(10))
                .onBackpressureBuffer(10) // ⚠️backpressure strategy
                .flatMap(i-> Mono.fromCallable(()->{
                            System.out.println("simulate IO " + Thread.currentThread() + "  " + i);
                            sleep(1000L); // simulate IO delay, very slow
                            return String.format("String %d", i);
                        }).subscribeOn(Schedulers.boundedElastic())
                        , 3)
                .take(10);

        Disposable disposable = tick.subscribe(x ->System.out.println("Subscribe thread: " + Thread.currentThread() + "  --> " + x),
                System.out::println,
                ()-> System.out.println("Done"));

        while(!disposable.isDisposed()){
            sleep(800);
            System.out.println("..wait..");
        }
        System.out.println("DONE AND DONE");

    }

and the result will be

Main thread: Thread[main,5,main]
15:08:52.854 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
simulate IO Thread[boundedElastic-1,5,main]  0
simulate IO Thread[boundedElastic-2,5,main]  1
simulate IO Thread[boundedElastic-3,5,main]  2
..wait..
Subscribe thread: Thread[boundedElastic-1,5,main]  --> String 0
Subscribe thread: Thread[boundedElastic-1,5,main]  --> String 1
Subscribe thread: Thread[boundedElastic-1,5,main]  --> String 2
simulate IO Thread[boundedElastic-4,5,main]  3
simulate IO Thread[boundedElastic-2,5,main]  4
simulate IO Thread[boundedElastic-3,5,main]  5
..wait..
Subscribe thread: Thread[boundedElastic-4,5,main]  --> String 3
simulate IO Thread[boundedElastic-1,5,main]  6
Subscribe thread: Thread[boundedElastic-2,5,main]  --> String 4
Subscribe thread: Thread[boundedElastic-2,5,main]  --> String 5
simulate IO Thread[boundedElastic-3,5,main]  7
simulate IO Thread[boundedElastic-4,5,main]  8
..wait..
Subscribe thread: Thread[boundedElastic-1,5,main]  --> String 6
simulate IO Thread[boundedElastic-2,5,main]  9
Subscribe thread: Thread[boundedElastic-3,5,main]  --> String 7
Subscribe thread: Thread[boundedElastic-3,5,main]  --> String 8
simulate IO Thread[boundedElastic-4,5,main]  10
simulate IO Thread[boundedElastic-1,5,main]  11
..wait..
..wait..
Subscribe thread: Thread[boundedElastic-2,5,main]  --> String 9
Done
Exiting
Exiting
..wait..
DONE AND DONE

Upvotes: 0

Phil Clay
Phil Clay

Reputation: 4536

One way to handle items in parallel, is to use .parallel / .runOn

flux
    .parallel(10)
    .runOn(scheduler)
    //
    // Work to be performed in parallel goes here.  (e.g. .map, .flatMap, etc)
    //
    // Then, if/when you're ready to go back to sequential, call .sequential()
    .sequential()

Blocking operations (such as blocking IO, or Thread.sleep) will block the thread on which they are executed. Reactive streams cannot magically turn a blocking method into a non-blocking method. Therefore, you need to ensure blocking methods are run on a Scheduler suitable for blocking operations (e.g. Schedulers.boundedElastic()).

In the example above, since you know you are calling a blocking operation, you could use .runOn(Schedulers.boundedElastic()).

Depending on the use case, you can also use async operators like .flatMap in combination with .subscribeOn or .publishOn to delegate specific blocking operations to another Scheduler, as described in the project reactor docs. For example:

flux
    .flatMap(i -> Mono.fromCallable(() -> {
            System.out.println("ReactorTests.test " + Thread.currentThread());
            sleep(1000L); // simulate IO delay
            return String.format("String %d", i);
        })
        .subscribeOn(Schedulers.boundedElastic()))

In fact, .flatMap also has an overloaded variant that takes a concurrency parameter where you can limit the maximum number of in-flight inner sequences. This can be used instead of .parallel in some use cases. It will not generally work for Flux.interval though, since Flux.interval doesn't support downstream requests that replenish slower than the ticks.

Upvotes: 11

Related Questions