Mayur
Mayur

Reputation: 904

What are the ways to pass threadpoolexecutor to CompletableFuture?

I have been working on Java CompletableFuture lately and found , we should always use customized Threadpool. With it, I found two ways of passing threadpool to existing code. Like below

This is my ThreadPool in configuration file

@Override
@Bean(name = "commonThreadPool")
public Executor getAsyncExecutor() {
  return new ThreadPoolTaskExecutor();
}

1. Passing existingThreadPool in argument.

 @Autowired
 @Qualifier("commonThreadPool") 
 TaskExecutor existingThreadPool;       
 CompletableFuture.runAsync(() -> executeTask(),existingThreadPool);

2. Using async like below

@Async("commonThreadPool")
public void executeTask() {
// Execute Some Task
}

is there any third way where I can write CompletableFuture Handler or Override its existing behaviour at single place where I can pass custom Threadpool. And after that wherever I use below code, it should pick my existing ThreadPool instead of forkJoin pool.

 CompletableFuture.runAsync(() -> executeTask());

Upvotes: 13

Views: 14074

Answers (2)

Holger
Holger

Reputation: 298213

There is no standard way for replacing the default executor for all CompletableFuture instances. But since Java 9, you can define a default executor for subclasses. E.g. with

public class MyCompletableFuture<T> extends CompletableFuture<T> {
    static final Executor EXEC = r -> {
        System.out.println("executing "+r);
        new Thread(r).start();
    };

    @Override
    public Executor defaultExecutor() {
        return EXEC;
    }

    @Override
    public <U> CompletableFuture<U> newIncompleteFuture() {
        return new MyCompletableFuture<>();
    }

    public static CompletableFuture<Void> runAsync​(Runnable runnable) {
        Objects.requireNonNull(runnable);
        return supplyAsync(() -> {
            runnable.run();
            return null;
        });
    }

    public static <U> CompletableFuture<U> supplyAsync​(Supplier<U> supplier) {
        return new MyCompletableFuture<U>().completeAsync(supplier);
    }
}

you did all necessary steps for defining the default executor for all chained stages of the MyCompletableFuture. The executor hold in EXEC only serves as an example, producing a printout when used, so when you use that example class like

MyCompletableFuture.supplyAsync(() -> "test")
    .thenApplyAsync(String::toUpperCase)
    .thenAcceptAsync(System.out::println);

it will print

executing java.util.concurrent.CompletableFuture$AsyncSupply@65ab7765
executing java.util.concurrent.CompletableFuture$UniApply@119d7047
executing java.util.concurrent.CompletableFuture$UniAccept@404b9385
TEST

Upvotes: 9

Oleg
Oleg

Reputation: 6314

I would strongly recommend against it but if you really want to you can use reflection to change the thread pool used by completable future.

public static void main(String[] args) throws Exception {
    // Prints ForkJoinPool.commonPool-worker-1
    CompletableFuture<Void> c = CompletableFuture.runAsync(() -> System.out.println(Thread.currentThread().getName()));
    c.get();

    setFinalStatic(CompletableFuture.class.getDeclaredField("asyncPool"), Executors.newFixedThreadPool(10));

    // Prints pool-1-thread-1
    c = CompletableFuture.runAsync(() -> System.out.println(Thread.currentThread().getName()));
    c.get();
}

static void setFinalStatic(Field field, Object newValue) throws Exception {
    field.setAccessible(true);
    Field modifiersField = Field.class.getDeclaredField("modifiers");
    modifiersField.setAccessible(true);
    modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
    field.set(null, newValue);
}

setFinalStatic is taken from https://stackoverflow.com/a/3301720/1398418

Upvotes: 2

Related Questions