Reputation: 73
I know subscribeOn used to switch executing thread when sequence being subscribe, but I found it's not working with ServerRequest.bodyToMono/Flux
Something like
Flux.just(1,2,3)
.doOnNext(integer -> log.info("test {}",integer))
.subscribeOn(Schedulers.elastic())
.subscribe();
will make execution thread change
INFO 23313 --- [ elastic-2] c.a.p.m.f.service.router.TestService : test 1
INFO 23313 --- [ elastic-2] c.a.p.m.f.service.router.TestService : test 2
INFO 23313 --- [ elastic-2] c.a.p.m.f.service.router.TestService : test 3
But what confused me is
Say I have a Spring WebFlux Router:
@Configuration
public class TestRouter {
@Bean
public RouterFunction<ServerResponse> testRouterFunction(TestService testService) {
return route().path("/test", builder -> builder.nest(accept(MediaType.ALL),
route -> route.PUT("/", req -> {
Mono<String> valueMono = req.bodyToMono(String.class);
return ServerResponse.ok().body(testService.test(valueMono), String.class);
}))).build();
}
}
and a Service:
@Service
@Slf4j
public class TestService {
public Mono<String> test(Mono<String> mono) {
return mono
.doOnSubscribe(subscription -> log.info("on subscribe"))
.subscribeOn(Schedulers.elastic())
.doOnNext(s -> log.info("received {}", s))
.subscribeOn(Schedulers.elastic());
}
}
basic logic is http put request to localhost:port/test will receive what it send to server as plain text
I try to make doOnNext run on other Thread rather than Spring WebFlux's NIO Thread, no matter where I put
subscribeOn
the execution Thread always be NIO Thread:
INFO 23200 --- [ctor-http-nio-4] c.a.p.m.f.service.router.TestService : on subscribe
INFO 23200 --- [ctor-http-nio-4] c.a.p.m.f.service.router.TestService : received test
Thanks to @MichaelBerry @SimonBaslé, Both of you helps me a lot, upvote both of your answers
In short reactor-netty will overriden subscribeOn for http subscription, use a flatMap()
to include a separate subscribeOn()
on a different Mono/Flux
or publishOn()
can do the job I want
Upvotes: 1
Views: 1643
Reputation: 72344
This isn't something you can change - it's only the last subscribeOn()
call in the chain before subscribe()
is called that's honoured, so it's up to WebFlux to use whatever scheduler it wants to. In this case it looks like it's handling the requests in an NIO driven event loop or similar.
However, you can include a flatMap()
call in your chain, for which you can specify a separate subscribeOn()
which won't be overriden. This may be an option depending on your use case, as you could do the bulk of the work in a publisher defined in the flatMap()
call.
Upvotes: 3