mindreader
mindreader

Reputation: 1812

ExecutorService.awaitTermination() never times out

I'm trying to implement a function where either the callables finish within stipulated time or the operation times out. I had hoped that ExecutorService.awaitTermination() would do this but was surprised to see that it doesn't. The code is below. The run never completes.

public class Counter implements Callable<Void> {

    public static void main(String[] args) throws InterruptedException {
        final Map<String, Counter> map = new HashMap<>();
        map.put("", new Counter());
        final Map<String, Future<Void>> result = executeTasksInParallel(map);
        final Future<Void> voidFuture = result.get("");
        try {
            voidFuture.get();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public Void call() throws Exception {
        for (long i = 0L; i < Long.MAX_VALUE; i++);
        return null;
    }

    public static <K, V> Map<K, Future<V>> executeTasksInParallel(final Map<K, ? extends Callable<V>> callablesById) throws InterruptedException {
        final Map<K, Future<V>> resultFuturesById = new HashMap<>();
        final ExecutorService executorService = Executors.newFixedThreadPool(callablesById.size());
        for (final Map.Entry<K, ? extends Callable<V>> callableByIdEntry : callablesById.entrySet()) {
            final K id = callableByIdEntry.getKey();
            final Callable<V> callable = callableByIdEntry.getValue();
            final Future<V> resultFuture = executorService.submit(callable);
            resultFuturesById.put(id, resultFuture);
        }
        executorService.shutdown();
        executorService.awaitTermination(5L, TimeUnit.SECONDS);
        return resultFuturesById;
    }
}

Am I missing something here? Thanks!

UPDATE:

I tried replacing try block content with below to avoid Future.get() from blocking but that didn't help either

if (voidFuture.isDone()) {
   voidFuture.get();
}

Upvotes: 1

Views: 1963

Answers (3)

starikoff
starikoff

Reputation: 1650

  1. Use shutdownNow() as Joe C has specified...
  2. ...But it will only work if your code in call() allows it to, for example by checking if the current thread is being interruped. See e.g. this question and its answers for details. Occasionally you might go without this "cooperative" behavior in your loop if it calls (directly or indirectly) methods that handle interrupt requests properly by throwing an InterruptedException (examples are Thread.sleep(...), Object.wait(...), Future.get(...), blocking operations on a channel that implements InterruptibleChannel etc). EDIT: ...and if the InterruptedException that is thrown is not suppressed.
  3. And yes, only call get() if the future isDone() (because it's on the main thread not managed by your executorService).

The final code would be

public class Counter implements Callable<Void> {

    public static void main(String[] args) throws InterruptedException {
        final Map<String, Counter> map = new HashMap<>();
        map.put("", new Counter());
        final Map<String, Future<Void>> result = executeTasksInParallel(map);
        final Future<Void> voidFuture = result.get("");
        try {
            if (voidFuture.isDone()) {
                voidFuture.get();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public Void call() throws Exception {
        for (long i = 0L; i < Long.MAX_VALUE; i++) {
            if (Thread.currentThread().isInterrupted()) {
                Thread.currentThread().interrupt(); // restore interrupted flag
                return null;
            }
            /* or e.g. throw an exception */
        }
        return null;
    }

    public static <K, V> Map<K, Future<V>> executeTasksInParallel(
            final Map<K, ? extends Callable<V>> callablesById)
            throws InterruptedException {
        final Map<K, Future<V>> resultFuturesById = new HashMap<>();
        final ExecutorService executorService =
            Executors.newFixedThreadPool(callablesById.size());
        for (final Map.Entry<K, ? extends Callable<V>> callableByIdEntry : callablesById
            .entrySet()) {
            final K id = callableByIdEntry.getKey();
            final Callable<V> callable = callableByIdEntry.getValue();
            final Future<V> resultFuture = executorService.submit(callable);
            resultFuturesById.put(id, resultFuture);
        }
        executorService.shutdown();
        executorService.awaitTermination(5L, TimeUnit.SECONDS);
        executorService.shutdownNow();
        return resultFuturesById;
    }
}

Upvotes: 2

Joe C
Joe C

Reputation: 15684

awaitTermination() doesn't try to kill the running tasks. Once awaitTermination() finishes, you should call shutdownNow() to attempt to kill what is still there.

https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html#shutdownNow()

Upvotes: 1

passion
passion

Reputation: 1360

Doc of awaintTermination :

Blocks until all tasks have completed execution after a shutdown request, or the timeout occurs, or the current thread is interrupted, whichever happens first.

It will be done after 5 seconds , but generated thread is still working and it is not a daemon thread , so you code will keep working until child thread's termination.

And voidFuture.get() will block until returned .

Upvotes: 0

Related Questions