revivn
revivn

Reputation: 73

subscribeOn not working when using it on ServerRequest's bodyToMono

I know subscribeOn used to switch executing thread when sequence being subscribe, but I found it's not working with ServerRequest.bodyToMono/Flux

Something like

Flux.just(1,2,3)
          .doOnNext(integer -> log.info("test {}",integer))
          .subscribeOn(Schedulers.elastic())
          .subscribe();

will make execution thread change

INFO 23313 --- [      elastic-2] c.a.p.m.f.service.router.TestService     : test 1
INFO 23313 --- [      elastic-2] c.a.p.m.f.service.router.TestService     : test 2
INFO 23313 --- [      elastic-2] c.a.p.m.f.service.router.TestService     : test 3

But what confused me is

Say I have a Spring WebFlux Router:

@Configuration
public class TestRouter {
    @Bean
    public RouterFunction<ServerResponse> testRouterFunction(TestService testService) {
        return route().path("/test", builder -> builder.nest(accept(MediaType.ALL),
          route -> route.PUT("/", req -> {
              Mono<String> valueMono = req.bodyToMono(String.class);
              return ServerResponse.ok().body(testService.test(valueMono), String.class);
          }))).build();
    }
}

and a Service:

@Service
@Slf4j
public class TestService {
    public Mono<String> test(Mono<String> mono) {
        return mono
          .doOnSubscribe(subscription -> log.info("on subscribe"))
          .subscribeOn(Schedulers.elastic())
          .doOnNext(s -> log.info("received {}", s))
          .subscribeOn(Schedulers.elastic());
    }
}

basic logic is http put request to localhost:port/test will receive what it send to server as plain text

I try to make doOnNext run on other Thread rather than Spring WebFlux's NIO Thread, no matter where I put

subscribeOn

the execution Thread always be NIO Thread:

INFO 23200 --- [ctor-http-nio-4] c.a.p.m.f.service.router.TestService     : on subscribe
INFO 23200 --- [ctor-http-nio-4] c.a.p.m.f.service.router.TestService     : received test

Thanks to @MichaelBerry @SimonBaslé, Both of you helps me a lot, upvote both of your answers

In short reactor-netty will overriden subscribeOn for http subscription, use a flatMap() to include a separate subscribeOn() on a different Mono/Flux or publishOn() can do the job I want

Upvotes: 1

Views: 1643

Answers (1)

Michael Berry
Michael Berry

Reputation: 72344

This isn't something you can change - it's only the last subscribeOn() call in the chain before subscribe() is called that's honoured, so it's up to WebFlux to use whatever scheduler it wants to. In this case it looks like it's handling the requests in an NIO driven event loop or similar.

However, you can include a flatMap() call in your chain, for which you can specify a separate subscribeOn() which won't be overriden. This may be an option depending on your use case, as you could do the bulk of the work in a publisher defined in the flatMap() call.

Upvotes: 3

Related Questions