Reputation: 2614
I want to call CompletableFuture.supplyAsync()
to delegate a blocking task to another thread. Once that task completes I would like for the CompletableFuture.thenAccept
consumer to run in the context of the calling thread.
For example:
// Thread 1
CompletableFuture.supplyAsync(() -> {
// Thread 2
return BlockingMethod();
}).thenAccept((
Object r) -> {
// Thread 1
});
The following code suggests that CompletableFuture.thenAccept
runs in its own thread; probably the same pool as CompletableFuture.supplyAsync
as I get the same thread ID when I run it:
System.out.println("Sync thread supply " + Thread.currentThread().getId());
CompletableFuture.supplyAsync(() -> {
System.out.println("Async thread " + Thread.currentThread().getId());
try {
Thread.sleep(2000);
}
catch (Exception e) {
e.printStackTrace();
}
return true;
}).thenAccept((
Boolean r) -> {
System.out.println("Sync thread consume " + Thread.currentThread().getId());
});
Thread.sleep(3000);
Is it possible to have CompletableFuture.thenAccept
run concurrently with the calling thread?
Upvotes: 5
Views: 6718
Reputation: 1650
If I understand your idea right that you want to fork I/O-intensive tasks but do all processing in the "event loop" (think Javascript), then your code could be translated to
Executor eventLoop = Executors.newSingleThreadExecutor();
Executor ioMultiplexor = Executors.newCachedThreadPool();
eventLoop.execute(() -> {
System.out.println("event loop thread supply " + Thread.currentThread().getId());
CompletableFuture.supplyAsync(() -> {
System.out.println("I/O multiplexor thread " + Thread.currentThread().getId());
try {
Thread.sleep(2000);
} catch (Exception e) {
e.printStackTrace();
}
return true;
}, ioMultiplexor).thenAcceptAsync((Boolean r) -> {
System.out.println("event loop thread consume " + Thread.currentThread().getId());
}, eventLoop);
});
Thread.sleep(3000);
// shut down executors
This prints
event loop thread supply 10
I/O multiplexor thread 11
event loop thread consume 10
If this code is used for some request handling where there can be many concurrent requests, you would probably want to have one global eventLoop
and one global ioMultiplexer
, and you would also want to release the main request handling thread once it finishes submitting the task onto eventLoop
.
Upvotes: 0
Reputation: 279990
The CompletableFuture
will only execute the Consumer
you register with thenAccept
when the receiver CompletableFuture
(one returned by supplyAsync
) is completed, since it needs the value it was completed with.
If the receiver CompletableFuture
is complete when thenAccept
is invoked, then the Consumer
will be executed in the calling thread. Otherwise, it will execute on whatever thread completes the Supplier
submitted to supplyAsync
.
Is it possible to have
CompletableFuture.thenAccept
run concurrently with the calling thread?
This is a confusing question because a thread can only run one thing at a time. There's no concurrently for a single thread. Concurrently is a property that spans multiple threads.
If you want the Consumer
to run on the same thread that invoked thenAccept
, then join
on the CompletableFuture
, blocking this thread until the future is completed. You can then execute the Consumer
yourself or call thenAccept
to execute it for you.
For example
CompletableFuture<Boolean> receiver = CompletableFuture.supplyAsync(() -> {
System.out.println("Async thread " + Thread.currentThread().getId());
try {
Thread.sleep(2000);
} catch (Exception e) {
e.printStackTrace();
}
return true;
});
receiver.join();
Consumer<Boolean> consumer = (Boolean r) -> {
System.out.println("Sync thread consume " + Thread.currentThread().getId());
};
consumer.accept(receiver.get());
(Exception handling omitted.)
If you want the Consumer
to run in parallel with the Supplier
supplied to supplyAsync
, that's not possible. That Consumer
is meant to consume the value produced by the Supplier
. That value isn't available until the Supplier
is finished.
Upvotes: 8