Leo Prince
Leo Prince

Reputation: 950

Running Tasks in different thread in Spring Webflux Annotated controller

I have a spring Webflux Annotated controller as below,

 @RestController
public class TestBlockingController {

  Logger log = LoggerFactory.getLogger(this.getClass().getName());

  @GetMapping()
  public Mono<String> blockForXSeconds(@RequestParam("block-seconds") Integer blockSeconds) {
    return getStringMono();
  }

  private Mono<String> getStringMono() {
    Integer blockSeconds = 5;
    String type = new String();
    try {
      if (blockSeconds % 2 == 0) {
        Thread.sleep(blockSeconds * 1000);
        type = "EVEN";
      } else {
        Thread.sleep(blockSeconds * 1000);
        type = "ODD";
      }
    } catch (Exception e) {
      log.info("Got Exception");
    }
    log.info("Type of block-seconds: " + blockSeconds);
    return Mono.just(type);
  }
}

How do I make getStringMono run in a different thread than Netty server threads. The problem I am facing is that as I am running in server thread I am getting basically less throughput (2 requests per second). How do I go about making running getStringMono in a separate thread.

Upvotes: 0

Views: 3662

Answers (2)

Sanjib Pal
Sanjib Pal

Reputation: 158

You can use Mono.defer() method. The method signature is as:

public static <T> Mono<T> defer(Supplier<? extends Mono<? extends T>> supplier)

Your Rest API should look like this.

@GetMapping()
public Mono<String> blockForXSeconds(@RequestParam("block-seconds") Integer blockSeconds) {
    return Mono.defer(() -> getStringMono());
}

The defer operator is there to make this source lazy, re-evaluating the content of the lambda each time there is a new subscriber. This will increase your API throughput.

Here you can view the detailed analysis.

Upvotes: 0

Martin Tarj&#225;nyi
Martin Tarj&#225;nyi

Reputation: 9987

You can use subscribeOn operator to delegate the task to a different threadpool:

Mono.defer(() -> getStringMono()).subscribeOn(Schedulers.elastic());

Although, you have to note that this type of blocking should be avoided in a reactive application at any cost. If possible, use a client which supports non-blocking IO and returns a promise type (Mono, CompletableFuture, etc.). If you just want to have an artificial delay, then use Mono.delay instead.

Upvotes: 1

Related Questions