Reputation: 173
I am trying to create a flow where a flux emits 10 items, each in parallel, with each item sleeping for 1s. Since each item is being published on a separate thread, I expect the entire process to take 1s. But the logs show that it's taking 10s instead.
I tried changing subscribeOn to publishOn, map to doOnNext. But none of them seem to work.
I am new to Reactor and am trying to understand where I am going wrong. Any help would be most appreciated. Thanks
public void whenIsANewThreadCreated() {
Flux.range(1,10)
.publishOn(Schedulers.elastic())
.map(count -> {
logger.info(Thread.currentThread().getName() + " - Sleeping for 1s" + " with count: " + count);
try {
Thread.sleep(1_000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return count;
})
.blockLast();
}
2020-03-30 16:17:29.799 INFO 15744 --- [ elastic-2] com.example.demo.DemoApplicationTests : elastic-2 - Sleeping for 1s with count: 1
2020-03-30 16:17:30.802 INFO 15744 --- [ elastic-2] com.example.demo.DemoApplicationTests : elastic-2 - Sleeping for 1s with count: 2
2020-03-30 16:17:31.804 INFO 15744 --- [ elastic-2] com.example.demo.DemoApplicationTests : elastic-2 - Sleeping for 1s with count: 3
2020-03-30 16:17:32.805 INFO 15744 --- [ elastic-2] com.example.demo.DemoApplicationTests : elastic-2 - Sleeping for 1s with count: 4
2020-03-30 16:17:33.806 INFO 15744 --- [ elastic-2] com.example.demo.DemoApplicationTests : elastic-2 - Sleeping for 1s with count: 5
2020-03-30 16:17:34.808 INFO 15744 --- [ elastic-2] com.example.demo.DemoApplicationTests : elastic-2 - Sleeping for 1s with count: 6
2020-03-30 16:17:35.809 INFO 15744 --- [ elastic-2] com.example.demo.DemoApplicationTests : elastic-2 - Sleeping for 1s with count: 7
2020-03-30 16:17:36.811 INFO 15744 --- [ elastic-2] com.example.demo.DemoApplicationTests : elastic-2 - Sleeping for 1s with count: 8
2020-03-30 16:17:37.812 INFO 15744 --- [ elastic-2] com.example.demo.DemoApplicationTests : elastic-2 - Sleeping for 1s with count: 9
2020-03-30 16:17:38.814 INFO 15744 --- [ elastic-2] com.example.demo.DemoApplicationTests : elastic-2 - Sleeping for 1s with count: 10
Upvotes: 1
Views: 1221
Reputation: 1
Reference: https://projectreactor.io/docs/core/release/reference/#faq.wrap-blocking
You can start those processes in background and obtain multithreading. This is not Parallelism. You should use parallel scheduler when you are doing CPU intensive tasks, use elastic when I/O or blocking operations.
public void whenIsANewThreadCreated() {
Flux.range(1,10)
.subscribeOn(Schedulers.boundedElastic()) // if not, main calling thread will be used
.flatMap(count -> {
log.info(Thread.currentThread().getName() + " - Sleeping for 1s" + " with count: " + count);
return Mono.fromCallable(() -> method5(count)).subscribeOn(Schedulers.boundedElastic());
})
.blockLast();
}
Mono<Integer> method5(int count) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return Mono.just(count);
}
and you get something like this
23:42:33.289 [boundedElastic-1] INFO com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 1
23:42:33.342 [boundedElastic-1] INFO com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 2
23:42:33.342 [boundedElastic-1] INFO com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 3
23:42:33.342 [boundedElastic-1] INFO com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 4
23:42:33.343 [boundedElastic-1] INFO com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 5
23:42:33.343 [boundedElastic-1] INFO com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 6
23:42:33.343 [boundedElastic-1] INFO com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 7
23:42:33.343 [boundedElastic-1] INFO com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 8
23:42:33.344 [boundedElastic-1] INFO com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 9
23:42:33.344 [boundedElastic-1] INFO com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 10
Upvotes: 0
Reputation: 28351
The specification mandates that onNext
events are invoked serially. Your map
is effectively turning input onNext
events into onNext
events that block for 1 second. Per the spec, 10 incoming onNext
lead to a series of 10 outgoing onNext
that each block for 1s => 10s of blocking.
You absolutely 100% HAVE to use parallel(10).runOn(Scheduler.elastic())
if you want to distribute that blocking workload on 10 parallel rails. (the Scheduler
for runOn can also be Schedulers.boundedElastic()
, or Schedulers.newParallel(10)
).
Upvotes: 1
Reputation: 15400
You have to first create a parallel flux by calling parallel
method and you have to use runOn
to achieve parallelism.
Flux.range(1,10)
.parallel()
.runOn(Schedulers.elastic())
.map(count -> {
System.out.println(Thread.currentThread().getName() + " - Sleeping for 1s" + " with count: " + count);
try {
Thread.sleep(1_000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return count;
}).subscribe();
Schedulers.boundedElastic()
as using Scheduler.elastic()
is discouragedparallel
by default will create threads based on your CPU core. If you want more threads use parallel(10)
- I think this is what you want to see.Upvotes: 2