Tom
Tom

Reputation: 19302

Handling exceptions from Java ExecutorService tasks

I'm trying to use Java's ThreadPoolExecutor class to run a large number of heavy weight tasks with a fixed number of threads. Each of the tasks has many places during which it may fail due to exceptions.

I've subclassed ThreadPoolExecutor and I've overridden the afterExecute method which is supposed to provide any uncaught exceptions encountered while running a task. However, I can't seem to make it work.

For example:

public class ThreadPoolErrors extends ThreadPoolExecutor {
    public ThreadPoolErrors() {
        super(  1, // core threads
                1, // max threads
                1, // timeout
                TimeUnit.MINUTES, // timeout units
                new LinkedBlockingQueue<Runnable>() // work queue
        );
    }

    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        if(t != null) {
            System.out.println("Got an error: " + t);
        } else {
            System.out.println("Everything's fine--situation normal!");
        }
    }

    public static void main( String [] args) {
        ThreadPoolErrors threadPool = new ThreadPoolErrors();
        threadPool.submit( 
                new Runnable() {
                    public void run() {
                        throw new RuntimeException("Ouch! Got an error.");
                    }
                }
        );
        threadPool.shutdown();
    }
}

The output from this program is "Everything's fine--situation normal!" even though the only Runnable submitted to the thread pool throws an exception. Any clue to what's going on here?

Thanks!

Upvotes: 252

Views: 247335

Answers (14)

jjazzboss
jjazzboss

Reputation: 1422

Here is an helper class which makes unexpected Exceptions or Errors visible, typically when using an ExecutorService.

How to use:

var aFuture = executorService.submit(new CheckedRunnable(() -> doStuff()));

The class:

/**
 * A wrapper for a runnable which makes unexpected runtime problems visible.
 * <p>
 * By default a Runnable passed to an ExecutorService will stop with no info
 * if an unexpected Exception or Error occurs. Use this wrapper to make sure that
 * such Exception or Error is logged with a stack trace.
 */
public class CheckedRunnable implements Runnable {

    private final Runnable r;

    public CheckedRunnable(Runnable r) {
        this.r = r;
    }

    @Override
    public void run() {
        try {
            r.run();
        } catch (Throwable ex) {
            ex.printStackTrace();
            throw ex;
        }
    }
}

Upvotes: 0

skaffman
skaffman

Reputation: 403591

WARNING: It should be noted that this solution will block the calling thread in future.get().


If you want to process exceptions thrown by the task, then it is generally better to use Callable rather than Runnable.

Callable.call() is permitted to throw checked exceptions, and these get propagated back to the calling thread:

Callable task = ...
Future future = executor.submit(task);
// do something else in the meantime, and then...
try {
   future.get();
} catch (ExecutionException ex) {
   ex.getCause().printStackTrace();
}

If Callable.call() throws an exception, this will be wrapped in an ExecutionException and thrown by Future.get().

This is likely to be much preferable to subclassing ThreadPoolExecutor. It also gives you the opportunity to re-submit the task if the exception is a recoverable one.

Upvotes: 265

Delark
Delark

Reputation: 1323

The doc's example wasn't giving me the results I wanted.

When a Thread process was abandoned (with explicit interput();s) Exceptions were appearing.

Also I wanted to keep the "System.exit" functionality that a normal main thread has with a typical throw, I wanted this so that the programmer was not forced to work on the code having to worry on it's context (... a thread), If any error appears, it must either be a programming error, or the case must be solved in place with a manual catch... no need for overcomplexities really.

So I changed the code to match my needs.

    @Override 
    protected void afterExecute(Runnable r, Throwable t) { 
        super.afterExecute(r, t); 
        if (t == null && r instanceof Future<?>) { 
            Future<?> future = (Future<?>) r; 
            boolean terminate = false; 
                try { 
                    future.get(); 
                } catch (ExecutionException e) { 
                    terminate = true; 
                    e.printStackTrace(); 
                } catch (InterruptedException | CancellationException ie) {// ignore/reset 
                    Thread.currentThread().interrupt(); 
                } finally { 
                    if (terminate) System.exit(0); 
                } 
        } 
    }

Be cautious though, this code basically transforms your threads into a main thread Exception-wise, while keeping all it's parallel properties... But let's be real, designing architectures in function of the system's parallel mechanism (extends Thread) is the wrong approach IMHO... unless an event driven design is strictly required....but then... if that is the requirement the question is: Is the ExecutorService even needed in this case?... maybe not.

Upvotes: 1

ccleve
ccleve

Reputation: 15809

This is similar to mmm's solution, but a bit more understandable. Have your tasks extend an abstract class that wraps the run() method.

public abstract Task implements Runnable {

    public abstract void execute();

    public void run() {
      try {
        execute();
      } catch (Throwable t) {
        // handle it  
      }
    }
}


public MySampleTask extends Task {
    public void execute() {
        // heavy, error-prone code here
    }
}

Upvotes: 2

mjs
mjs

Reputation: 22379

I got around it by wrapping the supplied runnable submitted to the executor.

CompletableFuture.runAsync(() -> {
        try {
              runnable.run();
        } catch (Throwable e) {
              Log.info(Concurrency.class, "runAsync", e);
        }
}, executorService);

Upvotes: 19

nos
nos

Reputation: 229264

From the docs:

Note: When actions are enclosed in tasks (such as FutureTask) either explicitly or via methods such as submit, these task objects catch and maintain computational exceptions, and so they do not cause abrupt termination, and the internal exceptions are not passed to this method.

When you submit a Runnable, it'll get wrapped in a Future.

Your afterExecute should be something like this:

public final class ExtendedExecutor extends ThreadPoolExecutor {

    // ...

    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        if (t == null && r instanceof Future<?>) {
            try {
                Future<?> future = (Future<?>) r;
                if (future.isDone()) {
                    future.get();
                }
            } catch (CancellationException ce) {
                t = ce;
            } catch (ExecutionException ee) {
                t = ee.getCause();
            } catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
            }
        }
        if (t != null) {
            System.out.println(t);
        }
    }
}

Upvotes: 179

obesga_tirant
obesga_tirant

Reputation: 31

This works

  • It is derived from SingleThreadExecutor, but you can adapt it easily
  • Java 8 lamdas code, but easy to fix

It will create a Executor with a single thread, that can get a lot of tasks; and will wait for the current one to end execution to begin with the next

In case of uncaugth error or exception the uncaughtExceptionHandler will catch it

public final class SingleThreadExecutorWithExceptions {

    public static ExecutorService newSingleThreadExecutorWithExceptions(final Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {

        ThreadFactory factory = (Runnable runnable)  -> {
            final Thread newThread = new Thread(runnable, "SingleThreadExecutorWithExceptions");
            newThread.setUncaughtExceptionHandler( (final Thread caugthThread,final Throwable throwable) -> {
                uncaughtExceptionHandler.uncaughtException(caugthThread, throwable);
            });
            return newThread;
        };
        return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                        0L, TimeUnit.MILLISECONDS,
                        new LinkedBlockingQueue(),
                        factory){


                    protected void afterExecute(Runnable runnable, Throwable throwable) {
                        super.afterExecute(runnable, throwable);
                        if (throwable == null && runnable instanceof Future) {
                            try {
                                Future future = (Future) runnable;
                                if (future.isDone()) {
                                    future.get();
                                }
                            } catch (CancellationException ce) {
                                throwable = ce;
                            } catch (ExecutionException ee) {
                                throwable = ee.getCause();
                            } catch (InterruptedException ie) {
                                Thread.currentThread().interrupt(); // ignore/reset
                            }
                        }
                        if (throwable != null) {
                            uncaughtExceptionHandler.uncaughtException(Thread.currentThread(),throwable);
                        }
                    }
                });
    }



    private static class FinalizableDelegatedExecutorService
            extends DelegatedExecutorService {
        FinalizableDelegatedExecutorService(ExecutorService executor) {
            super(executor);
        }
        protected void finalize() {
            super.shutdown();
        }
    }

    /**
     * A wrapper class that exposes only the ExecutorService methods
     * of an ExecutorService implementation.
     */
    private static class DelegatedExecutorService extends AbstractExecutorService {
        private final ExecutorService e;
        DelegatedExecutorService(ExecutorService executor) { e = executor; }
        public void execute(Runnable command) { e.execute(command); }
        public void shutdown() { e.shutdown(); }
        public List shutdownNow() { return e.shutdownNow(); }
        public boolean isShutdown() { return e.isShutdown(); }
        public boolean isTerminated() { return e.isTerminated(); }
        public boolean awaitTermination(long timeout, TimeUnit unit)
                throws InterruptedException {
            return e.awaitTermination(timeout, unit);
        }
        public Future submit(Runnable task) {
            return e.submit(task);
        }
        public  Future submit(Callable task) {
            return e.submit(task);
        }
        public  Future submit(Runnable task, T result) {
            return e.submit(task, result);
        }
        public  List> invokeAll(Collection> tasks)
                throws InterruptedException {
            return e.invokeAll(tasks);
        }
        public  List> invokeAll(Collection> tasks,
                                             long timeout, TimeUnit unit)
                throws InterruptedException {
            return e.invokeAll(tasks, timeout, unit);
        }
        public  T invokeAny(Collection> tasks)
                throws InterruptedException, ExecutionException {
            return e.invokeAny(tasks);
        }
        public  T invokeAny(Collection> tasks,
                               long timeout, TimeUnit unit)
                throws InterruptedException, ExecutionException, TimeoutException {
            return e.invokeAny(tasks, timeout, unit);
        }
    }



    private SingleThreadExecutorWithExceptions() {}
}

Upvotes: 3

Cristian Botiza
Cristian Botiza

Reputation: 439

If you want to monitor the execution of task, you could spin 1 or 2 threads (maybe more depending on the load) and use them to take tasks from an ExecutionCompletionService wrapper.

Upvotes: 1

Drew Wills
Drew Wills

Reputation: 8446

The explanation for this behavior is right in the javadoc for afterExecute:

Note: When actions are enclosed in tasks (such as FutureTask) either explicitly or via methods such as submit, these task objects catch and maintain computational exceptions, and so they do not cause abrupt termination, and the internal exceptions are not passed to this method.

Upvotes: 20

Kanagavelu Sugumar
Kanagavelu Sugumar

Reputation: 19270

This is because of AbstractExecutorService :: submit is wrapping your runnable into RunnableFuture (nothing but FutureTask) like below

AbstractExecutorService.java

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null); /////////HERE////////
    execute(ftask);
    return ftask;
}

Then execute will pass it to Worker and Worker.run() will call the below.

ThreadPoolExecutor.java

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();           /////////HERE////////
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

Finally task.run(); in the above code call will call FutureTask.run(). Here is the exception handler code, because of this you are NOT getting the expected exception.

class FutureTask<V> implements RunnableFuture<V>

public void run() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {   /////////HERE////////
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

Upvotes: 1

CSchulz
CSchulz

Reputation: 11030

Another solution would be to use the ManagedTask and ManagedTaskListener.

You need a Callable or Runnable which implements the interface ManagedTask.

The method getManagedTaskListener returns the instance you want.

public ManagedTaskListener getManagedTaskListener() {

And you implement in ManagedTaskListener the taskDone method:

@Override
public void taskDone(Future<?> future, ManagedExecutorService executor, Object task, Throwable exception) {
    if (exception != null) {
        LOGGER.log(Level.SEVERE, exception.getMessage());
    }
}

More details about managed task lifecycle and listener.

Upvotes: 5

Bass
Bass

Reputation: 5338

If your ExecutorService comes from an external source (i. e. it's not possible to subclass ThreadPoolExecutor and override afterExecute()), you can use a dynamic proxy to achieve the desired behavior:

public static ExecutorService errorAware(final ExecutorService executor) {
    return (ExecutorService) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
            new Class[] {ExecutorService.class},
            (proxy, method, args) -> {
                if (method.getName().equals("submit")) {
                    final Object arg0 = args[0];
                    if (arg0 instanceof Runnable) {
                        args[0] = new Runnable() {
                            @Override
                            public void run() {
                                final Runnable task = (Runnable) arg0;
                                try {
                                    task.run();
                                    if (task instanceof Future<?>) {
                                        final Future<?> future = (Future<?>) task;

                                        if (future.isDone()) {
                                            try {
                                                future.get();
                                            } catch (final CancellationException ce) {
                                                // Your error-handling code here
                                                ce.printStackTrace();
                                            } catch (final ExecutionException ee) {
                                                // Your error-handling code here
                                                ee.getCause().printStackTrace();
                                            } catch (final InterruptedException ie) {
                                                Thread.currentThread().interrupt();
                                            }
                                        }
                                    }
                                } catch (final RuntimeException re) {
                                    // Your error-handling code here
                                    re.printStackTrace();
                                    throw re;
                                } catch (final Error e) {
                                    // Your error-handling code here
                                    e.printStackTrace();
                                    throw e;
                                }
                            }
                        };
                    } else if (arg0 instanceof Callable<?>) {
                        args[0] = new Callable<Object>() {
                            @Override
                            public Object call() throws Exception {
                                final Callable<?> task = (Callable<?>) arg0;
                                try {
                                    return task.call();
                                } catch (final Exception e) {
                                    // Your error-handling code here
                                    e.printStackTrace();
                                    throw e;
                                } catch (final Error e) {
                                    // Your error-handling code here
                                    e.printStackTrace();
                                    throw e;
                                }
                            }
                        };
                    }
                }
                return method.invoke(executor, args);
            });
}

Upvotes: 0

yegor256
yegor256

Reputation: 105173

I'm using VerboseRunnable class from jcabi-log, which swallows all exceptions and logs them. Very convenient, for example:

import com.jcabi.log.VerboseRunnable;
scheduler.scheduleWithFixedDelay(
  new VerboseRunnable(
    Runnable() {
      public void run() { 
        // the code, which may throw
      }
    },
    true // it means that all exceptions will be swallowed and logged
  ),
  1, 1, TimeUnit.MILLISECONDS
);

Upvotes: 5

Kevin
Kevin

Reputation: 30449

Instead of subclassing ThreadPoolExecutor, I would provide it with a ThreadFactory instance that creates new Threads and provides them with an UncaughtExceptionHandler

Upvotes: -5

Related Questions