Reputation: 171
I have streams of incoming events that need to be enriched, and then processed in parallel as they arrive.
I was thinking Project Reactor was made to order for the job, but in my tests all of the processing seems to be done serially.
Here is some test code:
ExecutorService executor = Executors.newFixedThreadPool(10);
System.out.println("Main thread: " + Thread.currentThread());
Flux<String> tick = Flux.interval(Duration.of(10, ChronoUnit.MILLIS))
.map(i-> {
System.out.println("ReactorTests.test " + Thread.currentThread());
sleep(1000L); // simulate IO delay
return String.format("String %d", i);
})
.take(3)
// .subscribeOn(Schedulers.elastic());
// .subscribeOn(Schedulers.newParallel("test"));
// .subscribeOn(Schedulers.fromExecutor(executor));
;
tick.subscribe(x ->System.out.println("Subscribe thread: " + Thread.currentThread()),
System.out::println,
()-> System.out.println("Done"));
System.out.println("DONE AND DONE");
I have tried uncommenting each of the commented lines, however in every case the output indicates that the same thread is used to process all of the events
Main thread: Thread[main,5,main]
[DEBUG] (main) Using Console logging
DONE AND DONE
ReactorTests.test Thread[parallel-1,5,main]
Subscribe thread: Thread[parallel-1,5,main]
ReactorTests.test Thread[parallel-1,5,main]
Subscribe thread: Thread[parallel-1,5,main]
ReactorTests.test Thread[parallel-1,5,main]
Subscribe thread: Thread[parallel-1,5,main]
Done
(The only difference is that without the Schedulers, they are run on the subscribe thread, whereas with any of the executors, they all run in the same thread, which is not the subscribe thread.)
What am I missing?
FYI, there is a "sleep" method:
public static void sleep(long time) {
try {
Thread.sleep(time);
} catch (InterruptedException e) {
System.out.println("Exiting");
}
}
Upvotes: 8
Views: 7283
Reputation: 59
here is the code:
@Test
void testParallelEvent(){
System.out.println("Main thread: " + Thread.currentThread());
Flux<String> tick = ⚠️Flux.range(1, 100)
.⚠️flatMap(i-> ⚠️Mono.fromCallable(()->{
System.out.println("ReactorTests.test " + Thread.currentThread());
sleep(500L); // simulate IO delay
return String.format("String %d", i);
}).⚠️subscribeOn(Schedulers.boundedElastic())
, ⚠️3) // concurrency
.take(6);
// .subscribeOn(Schedulers.elastic());
// .subscribeOn(Schedulers.newParallel("test"));
// .subscribeOn(Schedulers.fromExecutor(executor));
tick.subscribe(x ->System.out.println("Subscribe thread: " + Thread.currentThread() + " --> " + x),
System.out::println,
()-> System.out.println("Done"));
System.out.println("DONE AND DONE");
sleep(8000);
}
and the output is:
Main thread: Thread[main,5,main]
16:35:34.034 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
ReactorTests.test Thread[boundedElastic-1,5,main]
ReactorTests.test Thread[boundedElastic-2,5,main]
DONE AND DONE
ReactorTests.test Thread[boundedElastic-3,5,main]
Subscribe thread: Thread[boundedElastic-1,5,main] --> String 1
Subscribe thread: Thread[boundedElastic-3,5,main] --> String 3
ReactorTests.test Thread[boundedElastic-4,5,main]
ReactorTests.test Thread[boundedElastic-5,5,main]
Subscribe thread: Thread[boundedElastic-2,5,main] --> String 2
ReactorTests.test Thread[boundedElastic-3,5,main]
Subscribe thread: Thread[boundedElastic-4,5,main] --> String 4
ReactorTests.test Thread[boundedElastic-2,5,main]
Subscribe thread: Thread[boundedElastic-5,5,main] --> String 5
ReactorTests.test Thread[boundedElastic-4,5,main]
Subscribe thread: Thread[boundedElastic-3,5,main] --> String 6
Done
Exiting
Exiting
if you need to use Flux.interval() as the event source, you must add some backpressure strategy or otherwise end up with the OverflowException:
Flux.interval(Duration.ofMillis(10))
.onBackpressureBuffer(10) // backpressure strategy
.flatMap(...)
here is full source code with Flux.interval & onBackpressureBuffer:
@Test
void testParallelWithBackpressureBuffer(){
System.out.println("Main thread: " + Thread.currentThread());
Flux<String> tick = Flux.interval(Duration.ofMillis(10))
.onBackpressureBuffer(10) // ⚠️backpressure strategy
.flatMap(i-> Mono.fromCallable(()->{
System.out.println("simulate IO " + Thread.currentThread() + " " + i);
sleep(1000L); // simulate IO delay, very slow
return String.format("String %d", i);
}).subscribeOn(Schedulers.boundedElastic())
, 3)
.take(10);
Disposable disposable = tick.subscribe(x ->System.out.println("Subscribe thread: " + Thread.currentThread() + " --> " + x),
System.out::println,
()-> System.out.println("Done"));
while(!disposable.isDisposed()){
sleep(800);
System.out.println("..wait..");
}
System.out.println("DONE AND DONE");
}
and the result will be
Main thread: Thread[main,5,main]
15:08:52.854 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
simulate IO Thread[boundedElastic-1,5,main] 0
simulate IO Thread[boundedElastic-2,5,main] 1
simulate IO Thread[boundedElastic-3,5,main] 2
..wait..
Subscribe thread: Thread[boundedElastic-1,5,main] --> String 0
Subscribe thread: Thread[boundedElastic-1,5,main] --> String 1
Subscribe thread: Thread[boundedElastic-1,5,main] --> String 2
simulate IO Thread[boundedElastic-4,5,main] 3
simulate IO Thread[boundedElastic-2,5,main] 4
simulate IO Thread[boundedElastic-3,5,main] 5
..wait..
Subscribe thread: Thread[boundedElastic-4,5,main] --> String 3
simulate IO Thread[boundedElastic-1,5,main] 6
Subscribe thread: Thread[boundedElastic-2,5,main] --> String 4
Subscribe thread: Thread[boundedElastic-2,5,main] --> String 5
simulate IO Thread[boundedElastic-3,5,main] 7
simulate IO Thread[boundedElastic-4,5,main] 8
..wait..
Subscribe thread: Thread[boundedElastic-1,5,main] --> String 6
simulate IO Thread[boundedElastic-2,5,main] 9
Subscribe thread: Thread[boundedElastic-3,5,main] --> String 7
Subscribe thread: Thread[boundedElastic-3,5,main] --> String 8
simulate IO Thread[boundedElastic-4,5,main] 10
simulate IO Thread[boundedElastic-1,5,main] 11
..wait..
..wait..
Subscribe thread: Thread[boundedElastic-2,5,main] --> String 9
Done
Exiting
Exiting
..wait..
DONE AND DONE
Upvotes: 0
Reputation: 4536
One way to handle items in parallel, is to use .parallel
/ .runOn
flux
.parallel(10)
.runOn(scheduler)
//
// Work to be performed in parallel goes here. (e.g. .map, .flatMap, etc)
//
// Then, if/when you're ready to go back to sequential, call .sequential()
.sequential()
Blocking operations (such as blocking IO, or Thread.sleep
) will block the thread on which they are executed. Reactive streams cannot magically turn a blocking method into a non-blocking method. Therefore, you need to ensure blocking methods are run on a Scheduler
suitable for blocking operations (e.g. Schedulers.boundedElastic()
).
In the example above, since you know you are calling a blocking operation, you could use .runOn(Schedulers.boundedElastic())
.
Depending on the use case, you can also use async operators like .flatMap
in combination with .subscribeOn
or .publishOn
to delegate specific blocking operations to another Scheduler
, as described in the project reactor docs. For example:
flux
.flatMap(i -> Mono.fromCallable(() -> {
System.out.println("ReactorTests.test " + Thread.currentThread());
sleep(1000L); // simulate IO delay
return String.format("String %d", i);
})
.subscribeOn(Schedulers.boundedElastic()))
In fact, .flatMap
also has an overloaded variant that takes a concurrency
parameter where you can limit the maximum number of in-flight inner sequences. This can be used instead of .parallel
in some use cases. It will not generally work for Flux.interval
though, since Flux.interval
doesn't support downstream requests that replenish slower than the ticks.
Upvotes: 11