Reputation: 797
I am trying to make a reactive @WebFilter
that executes stuff before and after the actual server exchange (i. e. the controller code handling the request):
public static class Foobar implements WebFilter {
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
return Mono.empty()
.doOnEach(s -> /* BEFORE */))
.then(chain.filter(exchange) /* CONTROLLER */)
.and(Mono.empty().doOnEach(s -> /* AFTER */))
.and(Mono.empty().doFinally(s -> /* FINALLY */));
}
}
Everything works as expected for simple GET
requests that return a Mono
:
@RestController
@RequestMapping
public static class Foo {
@GetMapping
@PostMapping(value = "foo")
public Mono<String> foo(ServerWebExchange exchange) {
return Mono.just("FOOBAR").map(e -> "OK");
}
}
But something really unexpected happens when the controller receives a parameter annotated as @RequestBody
. Say, for example a POST
request that takes a Mono<String>
from the client:
@RestController
@RequestMapping
public static class Bar {
@PostMapping(value = "bar")
public Mono<String> bar(ServerWebExchange exchange, @RequestBody Mono<String> text) {
return text.map(s -> "OK");
}
}
In this case, all steps in my filter are executed before the controller gets to complete the request. This means that the web exchange is committed independently of the filter and therefore I cannot do anything right after the response is sent back to the client.
So I'm wondering:
I've created a small Gist containing a test case that reproduces the problem:
Edit after Brian's comment:
I still think this might be a bug because Mono.then
doesn't seem to have any effect at all:
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
return chain.filter(exchange)
.doOnSubscribe(s -> logger.info("onSubscribe response committed:" +
exchange.getResponse().isCommitted()))
.then().doOnEach(s -> logger.info("then doOnEach response committed:" +
exchange.getResponse().isCommitted()))
.doFinally(s -> logger.info("doFinally response committed:" +
exchange.getResponse().isCommitted()));
}
Additionally, if I put stuff in doOnEach
is not executed either:
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
return chain.filter(exchange)
.doOnSubscribe(s -> logger.info("FILTER-BEFORE-CHAIN/commited=" +
response.isCommitted()))
.doOnEach(s -> logger.info("FILTER-AFTER-CHAIN/commited=" +
response.isCommitted()))
.doFinally(s -> logger.info("FILTER-FINALLY/commited=" +
response.isCommitted()));
}
Upvotes: 3
Views: 1861
Reputation: 59086
I don't think this is a bug in Spring (nor in Reactor in this case), but rather a wrong choice of operators to achieve what you're trying to do.
Mono.doOnEach
is executed for each signal (next element, completion, error, cancel); this will be executed several times per request in our case.Mono.and
joins the termination signals - so it waits for both Mono
to be done and then completes. But both Monos are not executed sequentially, they're subscribed at the same time. Mono.just
completes right away, independently of what happens with the filter chain.In your case, you don't need to have something more complex than adding one operator when the processing starts (doOnSubscribe
happens when the mapping has been done and we're starting to execute the handler) and another one when we're done (doFinally
happens when it's done, with completion, error or cancellation).
@Component
public class MyFilter implements WebFilter {
Logger logger = LoggerFactory.getLogger(getClass());
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
return chain.filter(exchange)
.doOnSubscribe(s -> logger.info("onSubscribe response committed:" +
exchange.getResponse().isCommitted()))
.doFinally(s -> logger.info("doFinally response committed:" +
exchange.getResponse().isCommitted()));
}
}
Note that this approach works as long as you're not performing blocking operations in the DoOnXYZ
methods, as they're side effects methods. Doing so will create significant performance problems in your application (and reactor might even reject that operation with an exception). So logging, adding an element to a map are fine - but publishing something to an event queue, for example, is not. In this case, you should use instead chain operations with Mono.then()
.
I don't think there is a bug with Mono.then()
- doOnEach
only works with signals sent downstream (next, error, complete). In this case, you should only get the complete signal. If you'd like to get the context in all cases, you can take a look at Mono.deferWithContext
.
Upvotes: 1