Mulgard
Mulgard

Reputation: 10589

How to wrap a Flux with a blocking operation in the subscribe?

In the documentation it is written that you should wrap blocking code into a Mono: http://projectreactor.io/docs/core/release/reference/#faq.wrap-blocking

But it is not written how to actually do it.

I have the following code:

@PostMapping(path = "some-path", consumes = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Mono<Void> doeSomething(@Valid @RequestBody Flux<Something> something) {
    something.subscribe(something -> {
        // some blocking operation
    });

    // how to return Mono<Void> here?
}

The first problem I have here is that I need to return something but I cant. If I would return a Mono.empty for example the request would be closed before the work of the flux is done.

The second problem is: how do I actually wrap the blocking code like it is suggested in the documentation:

Mono blockingWrapper = Mono.fromCallable(() -> { 
    return /* make a remote synchronous call */ 
});
blockingWrapper = blockingWrapper.subscribeOn(Schedulers.elastic()); 

Upvotes: 2

Views: 4929

Answers (1)

Brian Clozel
Brian Clozel

Reputation: 59086

You should not call subscribe within a controller handler, but just build a reactive pipeline and return it. Ultimately, the HTTP client will request data (through the Spring WebFlux engine) and that's what subscribes and requests data to the pipeline.

Subscribing manually will decouple the request processing from that other operation, which will 1) remove any guarantee about the order of operations and 2) break the processing if that other operation is using HTTP resources (such as the request body).

In this case, the source is not blocking, but only the transform operation is. So we'd better use publishOn to signal that the rest of the chain should be executed on a specific Scheduler. If the operation here is I/O bound, then Schedulers.elastic() is the best choice, if it's CPU-bound then Schedulers .paralell is better. Here's an example:

@PostMapping(path = "/some-path", consumes = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Mono<Void> doSomething(@Valid @RequestBody Flux<Something> something) {

    return something.collectList()
      .publishOn(Schedulers.elastic())
      .map(things -> { 
         return processThings(things);
      })
      .then();        
}

public ProcessingResult processThings(List<Something> things) {
  //...
}

For more information on that topic, check out the Scheduler section in the reactor docs. If your application tends to do a lot of things like this, you're losing a lot of the benefits of reactive streams and you might consider switching to a Servlet-based model where you can configure thread pools accordingly.

Upvotes: 7

Related Questions