Farhan
Farhan

Reputation: 431

Project Reactor - Parallel Execution

I have the below Flux,

@Test
public void fluxWithRange_CustomTest() {
    Flux<Integer> intFlux = Flux.range(1, 10).flatMap(i -> {
        if (i % 2 == 0) {
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            return Mono.just(i);
        } else {
            return Mono.just(i);
        }
    }, 2).subscribeOn(Schedulers.newBoundedElastic(2, 2, "test")).log();

    StepVerifier.create(intFlux).expectNext(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).verifyComplete();
}

I was expecting this to run in parallel, however, this just executes in 1 thread.

Upvotes: 1

Views: 812

Answers (1)

MissingSemiColon
MissingSemiColon

Reputation: 364

The subscribeOn method only provides a way to move execution to a different thread when "someone" subscribes to your Flux. What it means is that when you use the StepVerifier you are subscribing to the flux, and because you defined a Schedulers the execution is moved to one of the threads provided by the Schedulers. This does not imply that the Flux is going to be jumping between multiples threads.

The behaviour you are expecting can be archived by adding a second subscribeOn but to the Mono you are using inside the flatMap. When the flatMap now subscribes to the content it will use another thread.

If you change your code to something like this:

  @Test
  public void fluxWithRange_CustomTest() throws InterruptedException {
    Flux<Integer> intFlux = Flux.range(1, 10)
      .flatMap(i -> subFlux(i),2)
      .subscribeOn(Schedulers.newBoundedElastic(2, 2, "test")).log();

    StepVerifier.create(intFlux).expectNext(1, 2, 3, 4, 5, 6, 7, 8,9,10).verifyComplete(); //This now fails.

  }

  private Mono<Integer> subFlux(int i) {
    Mono<Integer> result = Mono.create(sink ->
    {
      if (i % 2 == 0) {
        try {
          Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
      }
      sink.success(i);
    });
    return result.subscribeOn(Schedulers.newBoundedElastic(2, 2, "other"));
  }

Upvotes: 2

Related Questions