Chinmay
Chinmay

Reputation: 751

Is it bad practice to use default common fork/join pool with CompletableFuture for doing long blocking calls?

Lets say I have a CompletableFuture which wraps a blocking call like querying a backend using JDBC. In this case, since I am not passing any executor service as a parameter to CompletableFuture.supplyAsync(), the actual blocking work of fetching the resources over backend should be done by a thread within the common Fork/Join pool. Isn't it bad practice to have threads from common FJpool do blocking calls? The advantage I have here is that my main thread isn't blocking, since I'm delegating blocking calls to be run asynchronously. Check abt JDBC calls being blocking here . If this inference is true, why do have the option of using default common FJpool with CompletableFuture?

CompletableFuture<List<String>> fetchUnicorns  = 
    CompletableFuture.supplyAsync(() -> {
        return unicornService.getUnicorns();
    });

fetchUnicorns.thenAccept(/**Do something with the result*/);

Upvotes: 20

Views: 10463

Answers (3)

Kyle Winkelman
Kyle Winkelman

Reputation: 461

Yes, it is bad practice. I suspect the reason for ForkJoinPool being a sane default is for compute heavy tasks it is similar performance to creating a Thread for each task but without the overhead of creating a Thread.

Here is an example that will limit the number of running CompletableFuture instances to the number of cores on your machine (takes 2 minutes on my machine):

public static void main(String... args) {
  var completableFutures = new ArrayList<CompletableFuture<Void>>(100);
  for (int i = 0; i < 100; i++) {
    int index = i;
    completableFutures.add(
        CompletableFuture.runAsync(
            () -> {
              try {
                Thread.sleep(10000);
                System.out.printf(
                    "Thread: %s, Index: %d, Time:%d%n",
                    Thread.currentThread().getName(), index, System.currentTimeMillis());
              } catch (InterruptedException e) {
                throw new RuntimeException(e);
              }
            }));
  }
  completableFutures.forEach(CompletableFuture::join);
}

Since Java 21 there are now Virtual Threads. By supplying the Executors.newVirtualThreadPerTaskExecutor() to the CompletableFuture.supplyAsync(Supplier, Executor). The below example runs all CompletableFuture instances at the same time (takes 10 seconds on my machine):

public static void main(String... args) {
  try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
    for (var i = 0; i < 100; i++) {
      var index = i;
      CompletableFuture.runAsync(
          () -> {
            try {
              Thread.sleep(10000);
              System.out.printf(
                  "Thread: %s, Index: %d, Time:%d%n",
                  Thread.currentThread().getName(), index, System.currentTimeMillis());
            } catch (InterruptedException e) {
              throw new RuntimeException(e);
            }
          },
          executor);
    }
  }
}

Upvotes: 0

Holger
Holger

Reputation: 298579

The reason why you should not use blocking calls (in this way), is that the common pool parallelism has been configured to utilize the existing CPU cores, assuming non-blocking jobs. Blocked threads will reduce the parallelism for other task using the same pool.

But there is an official solution to this:

class BlockingGetUnicorns implements ForkJoinPool.ManagedBlocker {
    List<String> unicorns;
    public boolean block() {
        unicorns = unicornService.getUnicorns();
        return true;
    }
    public boolean isReleasable() { return false; }
}
CompletableFuture<List<String>> fetchUnicorns  = 
    CompletableFuture.supplyAsync(() -> {
        BlockingGetUnicorns getThem = new BlockingGetUnicorns();
        try {
            ForkJoinPool.managedBlock(getThem);
        } catch (InterruptedException ex) {
            throw new AssertionError();
        }
        return getThem.unicorns;
    });

ForkJoinPool.ManagedBlocker is the abstraction of a potentially blocking operation that allows the Fork/Join pool to create compensation threads when it recognizes that a worker thread is about to be blocked.

It should be obvious that it is much easier to use

CompletableFuture<List<String>> fetchUnicorns  = 
    CompletableFuture.supplyAsync(() -> unicornService.getUnicorns(),
                                  Executors.newSingleThreadExecutor());

here. In a production environment, you would keep a reference to the executor, reuse it and eventually call shutDown on it. For a use case where the executor is not reused,

CompletableFuture<List<String>> fetchUnicorns  = 
    CompletableFuture.supplyAsync(() -> unicornService.getUnicorns(),
                                  r -> new Thread(r).start());

would suffice, as then, the thread will be disposed automatically after the job’s completion.

Upvotes: 23

the8472
the8472

Reputation: 43150

If this inference is true, why do have the option of using default common FJpool with CompletableFuture?

Because not all work is blocking.

You have the option to schedule your blocking work on a custom executor with CompletableFuture.supplyAsync(Supplier<U>, Executor)

Upvotes: 1

Related Questions