Reputation:
I have an ExecutorService
which forwards the computed data to a CompletableFuture
:
class DataRetriever {
private final ExecutorService service = ...;
public CompletableFuture<Data> retrieve() {
final CompletableFuture<Data> future = new CompletableFuture<>();
service.execute(() -> {
final Data data = ... fetch data ...
future.complete(data);
});
return future;
}
}
I want the client/user to be able to cancel the task:
final DataRetriever retriever = new DataRetriever();
final CompletableFuture<Data> future = retriever().retrieve();
future.cancel(true);
This does not work, as this cancels the outer CompletableFuture
, but not the inner future as scheduled in the executor service.
Is it somehow possible to propagate cancel()
on the outer future to the inner future?
Upvotes: 3
Views: 5416
Reputation: 319
With my Tascalate Concurrent library your code may be rewritten as following:
class DataRetriever {
private final ExecutorService service = ...;
public Promise<Data> retrieve() {
return CompletableTask.supplyAsync(() -> {
final Data data = ... fetch data ...
return data;
}, service);
}
}
Promise
and CompletableTask
are classes from my library, you can read more in my blog
Upvotes: 1
Reputation: 9944
Another solution, which Pillar touched on, is to extend CompletableFuture
. Here's one method that's quite similar to your existing code. It also handles exceptions, which is a nice bonus.
class CancelableFuture<T> extends CompletableFuture<T> {
private Future<?> inner;
/**
* Creates a new CancelableFuture which will be completed by calling the
* given {@link Callable} via the provided {@link ExecutorService}.
*/
public CancelableFuture(Callable<T> task, ExecutorService executor) {
this.inner = executor.submit(() -> complete(task));
}
/**
* Completes this future by executing a {@link Callable}. If the call throws
* an exception, the future will complete with that exception. Otherwise,
* the future will complete with the value returned from the callable.
*/
private void complete(Callable<T> callable) {
try {
T result = callable.call();
complete(result);
} catch (Exception e) {
completeExceptionally(e);
}
}
@Override
public boolean cancel(boolean mayInterrupt) {
return inner.cancel(mayInterrupt) && super.cancel(true);
}
}
Then, in DataRetriever
, you can simply do:
public CompletableFuture<Data> retrieve() {
return new CancelableFuture<>(() -> {... fetch data ...}, service);
}
Upvotes: 1
Reputation: 9944
By adding an exception handler to the outer future, you can have the call to cancel
be passed down to the inner future. This works because CompletableFuture.cancel
causes the future to complete exceptionally with a CancellationException
.
private final ExecutorService service = ...;
public CompletableFuture<Data> retrieve() {
final CompletableFuture<Data> outer = new CompletableFuture<>();
final Future<?> inner = service.submit(() -> {
...
future.complete(data);
});
outer.exceptionally((error) -> {
if (error instanceof CancellationException) {
inner.cancel(true);
}
return null; // must return something, because 'exceptionally' expects a Function
});
return outer;
}
The call to outer.exceptionally
creates a new CompletableFuture
, so it doesn't affect the cancellation or exception status of outer
itself. You can still append any other CompletionStage
you like to outer
, including another exceptionally
stage, and it will operate as expected.
Upvotes: 0
Reputation: 3531
CompletableFuture#cancel
really only serves the purpose of marking a CompletableFuture
as cancelled. It does nothing to notify an executing task to stop, because it has no relationship to any such task.
The javadoc hints at this
mayInterruptIfRunning
- this value has no effect in this implementation because interrupts are not used to control processing.
The inner future in your example is a Future
, not a CompletableFuture
, and it does have a relation to the executing Runnable
(or Callable
). Internally, since it know what Thread
the task is executing on, it can send it an interrupt
to attempt to stop it.
One option is to return a tuple of some sorts (eg. some POJO) that provides a reference to both your CompletableFuture
and to the Future
returned by ExecutorService#submit
. You can use the Future
to cancel
if you need to. You'd have to remember to cancel
or complete
your CompletableFuture
so that other parts of your code don't remain blocked/starved forever.
Upvotes: 5