Reputation: 75
I'm new to the Reactor, so I'd like to know, why is merging in this sample code working not as it's subscribed in the documentation. One flux returns odd numbers with a delay of 3 secs and other returns evens with a delay of 2 secs, but when I subscribe, it prints results of the first flux and then of the second one sequentially, but not interspersed. Return is 2, 4, 6, 8, .... 1, 3, 5, 7 etc, while I'm expecting 2, 4, 3, 6, 8, 5, 10, 7 etc. Other words: why don't two sourse fluces run in parallel? Here's a sample code
public class Main {
public static void main(String[] args) throws InterruptedException {
Flux.merge(fun1(10), fun2(10)).subscribe(System.out::println);
}
public static Flux<Integer> fun1(int i) {
return Flux.range(1, i).map(integer -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return integer * 2;
});
}
public static Flux<Integer> fun2(int i) {
return Flux.range(1, i).map(integer -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return integer * 2 + 1;
});
}
}
Upvotes: 1
Views: 904
Reputation: 4642
By default, the subscription
happens on the calling thread, which in this case would be main
. It first enters fun1(int i)
, and completes the inner flux
. Then it enters fun2(int i)
, and finishes the second flux
. Reactor's pipelines run on the calling thread by default (unlike say CompletableFuture<T>
that runs on the fork-join
pool by default). You can see interweaved output in the following code:
Flux.merge(fun1(10).subscribeOn(Schedulers.parallel()), fun2(10).subscribeOn(Schedulers.parallel())).subscribe(System.out::println);
Consider using Flux.delay
instead of blocking the thread by using Thread.sleep
.
Upvotes: 0
Reputation: 9937
Reactor is concurrency agnostic: by default everything runs on the main thread. In your example this means that Thread.sleep
is blocking the progress in the main thread that's why the second Flux
only starts to work after the first. To fix this...
1. You can use delay
operator which waits in a non-blocking manner and internally switches to a different thread.
public static void main(String[] args) {
Flux.merge(fun1(10), fun2(10))
.doOnNext(System.out::println)
.blockLast(); // block so main thread waits for background threads to finish
}
public static Flux<Integer> fun1(int i) {
return Flux.range(1, i)
.concatMap(integer -> Mono.delay(Duration.ofMillis(2000)).thenReturn(integer * 2));
}
public static Flux<Integer> fun2(int i) {
return Flux.range(1, i)
.concatMap(integer -> Mono.delay(Duration.ofMillis(3000)).thenReturn(integer * 2 + 1));
}
2. You can switch to a different thread manually using a Scheduler
. In most real-world use cases you'll need this as usually instead of Thread.sleep
you have for example a blocking database call.
public static void main(String[] args) {
Flux.merge(fun1(10), fun2(10))
.doOnNext(System.out::println)
.blockLast(); // block so main thread waits for background threads to finish
}
public static Flux<Integer> fun1(int i) {
return Flux.range(1, i).map(integer -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return integer * 2;
}).subscribeOn(Schedulers.boundedElastic());
}
public static Flux<Integer> fun2(int i) {
return Flux.range(1, i).map(integer -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return integer * 2 + 1;
}).subscribeOn(Schedulers.boundedElastic());
}
Note that though, if you are using a reactive non-blocking client (Spring WebClient, R2DBC, etc.) you don't need to specify any Scheduler
explicitly as these clients take care of that for you just like in case of the delay
operator.
Useful read in reference docs: https://projectreactor.io/docs/core/release/reference/#schedulers
Upvotes: 3