vetal22331122
vetal22331122

Reputation: 75

Why merged Fluxes do not interleave concurrently?

I'm new to the Reactor, so I'd like to know, why is merging in this sample code working not as it's subscribed in the documentation. One flux returns odd numbers with a delay of 3 secs and other returns evens with a delay of 2 secs, but when I subscribe, it prints results of the first flux and then of the second one sequentially, but not interspersed. Return is 2, 4, 6, 8, .... 1, 3, 5, 7 etc, while I'm expecting 2, 4, 3, 6, 8, 5, 10, 7 etc. Other words: why don't two sourse fluces run in parallel? Here's a sample code

public class Main {
    public static void main(String[] args) throws InterruptedException {
        Flux.merge(fun1(10), fun2(10)).subscribe(System.out::println);
    }

    public static Flux<Integer> fun1(int i) {
        return Flux.range(1, i).map(integer -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return integer * 2;
        });
    }

    public static Flux<Integer> fun2(int i) {
        return Flux.range(1, i).map(integer -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return integer * 2 + 1;
        });
    }
}

Upvotes: 1

Views: 904

Answers (2)

Prashant Pandey
Prashant Pandey

Reputation: 4642

By default, the subscription happens on the calling thread, which in this case would be main. It first enters fun1(int i), and completes the inner flux. Then it enters fun2(int i), and finishes the second flux. Reactor's pipelines run on the calling thread by default (unlike say CompletableFuture<T> that runs on the fork-join pool by default). You can see interweaved output in the following code:

Flux.merge(fun1(10).subscribeOn(Schedulers.parallel()), fun2(10).subscribeOn(Schedulers.parallel())).subscribe(System.out::println);

Consider using Flux.delay instead of blocking the thread by using Thread.sleep.

Upvotes: 0

Martin Tarj&#225;nyi
Martin Tarj&#225;nyi

Reputation: 9937

Reactor is concurrency agnostic: by default everything runs on the main thread. In your example this means that Thread.sleep is blocking the progress in the main thread that's why the second Flux only starts to work after the first. To fix this...

1. You can use delay operator which waits in a non-blocking manner and internally switches to a different thread.

public static void main(String[] args) {
    Flux.merge(fun1(10), fun2(10))
            .doOnNext(System.out::println)
            .blockLast(); // block so main thread waits for background threads to finish
}

public static Flux<Integer> fun1(int i) {
    return Flux.range(1, i)
            .concatMap(integer -> Mono.delay(Duration.ofMillis(2000)).thenReturn(integer * 2));
}

public static Flux<Integer> fun2(int i) {
    return Flux.range(1, i)
            .concatMap(integer -> Mono.delay(Duration.ofMillis(3000)).thenReturn(integer * 2 + 1));
}

2. You can switch to a different thread manually using a Scheduler. In most real-world use cases you'll need this as usually instead of Thread.sleep you have for example a blocking database call.

public static void main(String[] args) {
    Flux.merge(fun1(10), fun2(10))
            .doOnNext(System.out::println)
            .blockLast(); // block so main thread waits for background threads to finish
}

public static Flux<Integer> fun1(int i) {
    return Flux.range(1, i).map(integer -> {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return integer * 2;
    }).subscribeOn(Schedulers.boundedElastic());
}

public static Flux<Integer> fun2(int i) {
    return Flux.range(1, i).map(integer -> {
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return integer * 2 + 1;
    }).subscribeOn(Schedulers.boundedElastic());
}

Note that though, if you are using a reactive non-blocking client (Spring WebClient, R2DBC, etc.) you don't need to specify any Scheduler explicitly as these clients take care of that for you just like in case of the delay operator.

Useful read in reference docs: https://projectreactor.io/docs/core/release/reference/#schedulers

Upvotes: 3

Related Questions