user4444533
user4444533

Reputation:

Cancellation of CompletableFuture controlled by ExecutorService

I have an ExecutorService which forwards the computed data to a CompletableFuture:

class DataRetriever {
    private final ExecutorService service = ...;

    public CompletableFuture<Data> retrieve() {
        final CompletableFuture<Data> future = new CompletableFuture<>();

        service.execute(() -> {
            final Data data = ... fetch data ...
            future.complete(data);
        });
        return future;
    }
}

I want the client/user to be able to cancel the task:

final DataRetriever retriever = new DataRetriever();
final CompletableFuture<Data> future = retriever().retrieve();

future.cancel(true);

This does not work, as this cancels the outer CompletableFuture, but not the inner future as scheduled in the executor service.

Is it somehow possible to propagate cancel() on the outer future to the inner future?

Upvotes: 3

Views: 5416

Answers (4)

Valery Silaev
Valery Silaev

Reputation: 319

With my Tascalate Concurrent library your code may be rewritten as following:

class DataRetriever {
  private final ExecutorService service = ...;

  public Promise<Data> retrieve() {
    return CompletableTask.supplyAsync(() -> {
      final Data data = ... fetch data ...
      return data;
    }, service);
  }
}

Promise and CompletableTask are classes from my library, you can read more in my blog

Upvotes: 1

Sam
Sam

Reputation: 9944

Another solution, which Pillar touched on, is to extend CompletableFuture. Here's one method that's quite similar to your existing code. It also handles exceptions, which is a nice bonus.

class CancelableFuture<T> extends CompletableFuture<T> {
    private Future<?> inner;

    /**
     * Creates a new CancelableFuture which will be completed by calling the
     * given {@link Callable} via the provided {@link ExecutorService}.
     */
    public CancelableFuture(Callable<T> task, ExecutorService executor) {
        this.inner = executor.submit(() -> complete(task));
    }

    /**
     * Completes this future by executing a {@link Callable}. If the call throws
     * an exception, the future will complete with that exception. Otherwise,
     * the future will complete with the value returned from the callable.
     */
    private void complete(Callable<T> callable) {
        try {
            T result = callable.call();
            complete(result);
        } catch (Exception e) {
            completeExceptionally(e);
        }
    }

    @Override
    public boolean cancel(boolean mayInterrupt) {
        return inner.cancel(mayInterrupt) && super.cancel(true);
    }
}

Then, in DataRetriever, you can simply do:

public CompletableFuture<Data> retrieve() {
    return new CancelableFuture<>(() -> {... fetch data ...}, service);
}

Upvotes: 1

Sam
Sam

Reputation: 9944

By adding an exception handler to the outer future, you can have the call to cancel be passed down to the inner future. This works because CompletableFuture.cancel causes the future to complete exceptionally with a CancellationException.

private final ExecutorService service = ...;

public CompletableFuture<Data> retrieve() {
    final CompletableFuture<Data> outer = new CompletableFuture<>();

    final Future<?> inner = service.submit(() -> {
        ...
        future.complete(data);
    });

    outer.exceptionally((error) -> {
        if (error instanceof CancellationException) {
            inner.cancel(true);
        }
        return null; // must return something, because 'exceptionally' expects a Function
    });

    return outer;
}

The call to outer.exceptionally creates a new CompletableFuture, so it doesn't affect the cancellation or exception status of outer itself. You can still append any other CompletionStage you like to outer, including another exceptionally stage, and it will operate as expected.

Upvotes: 0

Savior
Savior

Reputation: 3531

CompletableFuture#cancel really only serves the purpose of marking a CompletableFuture as cancelled. It does nothing to notify an executing task to stop, because it has no relationship to any such task.

The javadoc hints at this

mayInterruptIfRunning - this value has no effect in this implementation because interrupts are not used to control processing.

The inner future in your example is a Future, not a CompletableFuture, and it does have a relation to the executing Runnable (or Callable). Internally, since it know what Thread the task is executing on, it can send it an interrupt to attempt to stop it.

One option is to return a tuple of some sorts (eg. some POJO) that provides a reference to both your CompletableFuture and to the Future returned by ExecutorService#submit. You can use the Future to cancel if you need to. You'd have to remember to cancel or complete your CompletableFuture so that other parts of your code don't remain blocked/starved forever.

Upvotes: 5

Related Questions