user1142317
user1142317

Reputation: 583

ExecutorService.invokeAll(timeout, TimeUnit) vs Future.get(timeout, TimeUnit)

I am using the ExecutorService.invokeAll(Callable, timeout, TimeUnit) method

And inside each of the Callable submitted to ExecutorService I have a future.get()

Will the future.get() would still be running in the background even if the executorService has timed out ?

Do I have to specify the timeout on the future.get(timeout, TimeUnit) as well and throw a TimeoutException so as to make sure the future is terminated ?

Upvotes: 3

Views: 6097

Answers (2)

Holger
Holger

Reputation: 298233

As the documentation states:

Upon return, tasks that have not completed are cancelled.

It’s not stated explicitly, but it uses Future.cancel(true), i.e. interrupt the running tasks. Since Future.get() supports interruption, this will cause it to complete by throwing an InterruptedException. If your Callable does not catch it nor does anything that could reset the interruption state, this implies that the callables will stop waiting in Future.get() if the timeout specified to invokeAll elapsed.

However, invokeAll only cancels the futures, hence, sends the interruption signal but does not wait for the threads to react on it and complete the execution of the Callable’s code. So by the time, invokeAll completes due to a timeout, some of the threads might still run on already cancelled tasks. But if these tasks merely consist of a future::get, this should not be an issue.


But if all you want to do, is to wait for the completion of a list of existing Futures, you can do this more efficient. After all, you are wrapping each Future into a Callable invoking Future.get, which invokeAll will wrap into another Future, potentially blocking one worker thread per future, followed by waiting for the completion of all of these future. The last step is exactly what this task is about, so you could do it without the previous steps, e.g.

public static void waitForAll(Collection<? extends Future<?>> futures,
                              long timeout, TimeUnit unit)
    throws InterruptedException {

    long nanos = unit.toNanos(timeout);
    boolean done = false;
    try {
        final long deadline = System.nanoTime() + nanos;
        final int size = futures.size();
        for(Future<?> f: futures) {
            if(!f.isDone()) {
                if (nanos <= 0L) return;
                try { f.get(nanos, TimeUnit.NANOSECONDS); }
                catch(CancellationException | ExecutionException ignore) {}
                catch(TimeoutException toe) { return; }
                nanos = deadline - System.nanoTime();
            }
        }
        done = true;
    }
    finally { if (!done) for(Future<?> f: futures) f.cancel(true); }
}

This is basically how AbstractExecutorService waits for the completion of all futures, after having submitted all Callables, creating a Future for each. As said, if all you want to do is to wait for a list of existing futures, you can do this directly without wasting resources by submitting jobs to an executor. Another benefit of using this method is that this will cancel the original futures rather than just cancelling the jobs that wait for the futures. You can also rely on this method cancelling with interruption whereas this property is not specified explicitly for invokeAll,

Upvotes: 2

daniu
daniu

Reputation: 15008

From the Javadoc

Executes the given tasks, returning a list of Futures holding their status and results when all complete or the timeout expires

So 1. all results will have been provided by all Callables, including the result from the future.get() call inside, and 2. once invokeAll returns (before the timeout, that is), all futures will have terminated.

Upvotes: 0

Related Questions