a3dsfcv
a3dsfcv

Reputation: 1274

CompletableFuture does not complete on timeout

I have a method read that reads from input stream. I want to complete the future if read has not finished after timeout.

public static void main(String[] args) {
   CompletableFuture<?> future = new CompletableFuture<>();
   future
      .thenRunAsync(() -> {
         try {
            read(data);
         } catch (IOException e) {
            future.completeExceptionally(e);
         }
      })
      .orTimeout(1, TimeUnit.SECONDS);

   future.join();
} 

But when I run this code, it does not finish by timeout and waits for input stream anyway.

Upvotes: 0

Views: 3350

Answers (1)

Slaw
Slaw

Reputation: 46170

There's at least two problems with your code:

  1. Nothing is being executed. You create a CompletableFuture and then invoke thenRunAsync on it. The stage created by thenRunAsync will only trigger once the previous stage has completed. Since you never complete the original CompletableFuture this will never happen. You also end up joining on a future that will never complete.

  2. You're joining on the wrong CompletableFuture. Methods such as thenRunAsync and orTimeout return a new instance which creates a sort of "chain" of stages. Each stage is triggered by the completion of its "parent" stage. To fully understand this I recommend reading the documentation of CompletionStage.

Here's an example of your code working how I suspect you want:

public static void main(String[] args) {
  CompletableFuture.runAsync(
          () -> {
            try {
              read(data);
            } catch (IOException ex) {
              throw new UncheckedIOException(ex);
            }
          })
      .orTimeout(1L, TimeUnit.SECONDS)
      .join();
}

Some notes:

  • Used CompletableFuture#runAsync(Runnable) to create a "primordial" stage. This stage will complete when the Runnable completes and the common ForkJoinPool is used to execute the Runnable.

  • If and when thrown, the UncheckedIOException in the runAsync stage will cause the stage to complete exceptionally.

  • The #join() method is invoked on the instance returned by the orTimeout(1L, TimeUnit.SECONDS) call. Now if the timeout elapses the call to join() will throw a CompletionException wrapping a TimeoutException.

Warning: The call to read is not interrupted1 when the timeout elapses, meaning it will continue to execute in the background. This is due to the fact CompletableFuture has no reference to the executing threads and thus cannot interrupt them.


1. Assuming an interruption would even have an effect.

Upvotes: 1

Related Questions