alexsmail
alexsmail

Reputation: 5813

Stateful exception handling with ThreadPoolExecutor

Basically, there is couple different strategies with exception handling when you use ThreadPoolExecutor:

  1. Thread.setUncaughtExceptionHandler() (and Thread.getDefaultUncaughtExceptionHandler())

    Exception is wrapped in the Future, so UncaughtExceptionHandler is never called, so this can't be used.

  2. Setting ThreadFactory The only relevant part is Thread.setUncaughtExceptionHandler() on newley created thread. But this will have no effect, see p.1).

  3. Overriding ThreadPoolExecutor.afterExecute()

    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        if (t == null && r instanceof Future<?>) {
            try {
                  Object result = ((Future<?>) r).get();
                } catch (CancellationException ce) {
                  t = ce;
                } catch (ExecutionException ee) {
                  t = ee.getCause();
                } catch (InterruptedException ie) {
                  Thread.currentThread().interrupt(); // ignore/reset
                }
        }
        if (t != null){
            logger.error("ThreadPoolExecutor.afterExecute", t);
        }
    }
    

    This approach almost works. If you exception handling is stateless, that is you don't need to access the state of your original Runnable/Callable task, this is ok. In stateful case you have no access to you original task (even reflection doesn't help, because Runnable above will not hold original task).

How can I handle exception when I do want access state of original task?

Upvotes: 4

Views: 2530

Answers (2)

alexsmail
alexsmail

Reputation: 5813

First of all see Handling Exceptions for ThreadPoolExecutor for more background about problem with afterExecute() approach.

ThreadPoolExecutor has

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) ;

and

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value);

when callable, runnable are your original task that you can decorate. This is basic strategies. Below is working code using Spring (I removed comment for clearity):

package org.springframework.scheduling.concurrent;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.springframework.core.task.TaskDecorator;
import org.springframework.scheduling.concurrent.ConcurrentTaskExecutor;
import org.springframework.scheduling.concurrent.ExecutorConfigurationSupport;
import org.springframework.scheduling.concurrent.ThreadPoolExecutorFactoryBean;
import org.springframework.util.Assert;

public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport implements AsyncListenableTaskExecutor, SchedulingTaskExecutor {

    private final Object poolSizeMonitor = new Object();

    private int corePoolSize = 1;

    private int maxPoolSize = Integer.MAX_VALUE;

    private int keepAliveSeconds = 60;

    private int queueCapacity = Integer.MAX_VALUE;

    private boolean allowCoreThreadTimeOut = false;
    //fix
    private CallableTransform callableTransform;

    private ThreadPoolExecutor threadPoolExecutor;


    public void setCorePoolSize(int corePoolSize) {
        synchronized (this.poolSizeMonitor) {
            this.corePoolSize = corePoolSize;
            if (this.threadPoolExecutor != null) {
                this.threadPoolExecutor.setCorePoolSize(corePoolSize);
            }
        }
    }

    public int getCorePoolSize() {
        synchronized (this.poolSizeMonitor) {
            return this.corePoolSize;
        }
    }

    public void setMaxPoolSize(int maxPoolSize) {
        synchronized (this.poolSizeMonitor) {
            this.maxPoolSize = maxPoolSize;
            if (this.threadPoolExecutor != null) {
                this.threadPoolExecutor.setMaximumPoolSize(maxPoolSize);
            }
        }
    }

    public int getMaxPoolSize() {
        synchronized (this.poolSizeMonitor) {
            return this.maxPoolSize;
        }
    }

    public void setKeepAliveSeconds(int keepAliveSeconds) {
        synchronized (this.poolSizeMonitor) {
            this.keepAliveSeconds = keepAliveSeconds;
            if (this.threadPoolExecutor != null) {
                this.threadPoolExecutor.setKeepAliveTime(keepAliveSeconds, TimeUnit.SECONDS);
            }
        }
    }

    public int getKeepAliveSeconds() {
        synchronized (this.poolSizeMonitor) {
            return this.keepAliveSeconds;
        }
    }

    public void setQueueCapacity(int queueCapacity) {
        this.queueCapacity = queueCapacity;
    }

    public void setAllowCoreThreadTimeOut(boolean allowCoreThreadTimeOut) {
        this.allowCoreThreadTimeOut = allowCoreThreadTimeOut;
    }


    //fix
    public void setCallableDecorator(CallableDecorator callableDecorator) {
        Assert.isNull(this.callableTransform, "You can' call setCallableDecorator() and setTaskDecorator() more than once");
        this.callableTransform = new CallableTransform(){

            @Override
            public Callable<?> decorate(Object originalTask) {
                Callable<?> ret = callableDecorator.decorate((Callable<?>)originalTask);
                return ret;
            }

            @Override
            public boolean isCallable(){
                return true;
            }
    };
}

    //fix
    public void setTaskDecorator(TaskDecorator taskDecorator) {
        Assert.isNull(this.callableTransform, "You can' call setCallableDecorator() and setTaskDecorator() more than once");
        this.callableTransform =  new CallableTransform(){

            @Override
            public Callable<?> decorate(Object originalTask) {
                Callable<?> ret= Executors.callable(taskDecorator.decorate((Runnable)originalTask));
                return ret;
            }

            @Override
            public boolean isCallable(){
                return false;
            }
        };
    }


    @Override
    protected ExecutorService initializeExecutor(
            ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {

        BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);

        ThreadPoolExecutor executor;

        //fix
        if (this.callableTransform != null) {
            executor = new ThreadPoolExecutor(
                    this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
                    queue, threadFactory, rejectedExecutionHandler) {

                @Override
                protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
                    if(callableTransform==null){
                        return super.newTaskFor(callable);
                    }

                    Callable<?> wrapedCallable = null;

                    boolean isCallable = callableTransform.isCallable();
                    if(isCallable){
                        wrapedCallable = callableTransform.decorate(callable);
                    } else {
                        //callableTransform accepts Runnable, but we have Callable
                        throw new IllegalStateException("You use TaskDecorator, but submit Callable");
                    }

                    @SuppressWarnings("unchecked")
                    Callable<T> param = (Callable<T>)wrapedCallable;
                    return super.newTaskFor(param);
                }

                @Override
                protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
                    if(callableTransform==null){
                        return super.newTaskFor(runnable, value);
                    }

                    Callable<?> wrapedCallable = null;

                    boolean isRunnable = callableTransform.isRunnable();
                    if(isRunnable){
                        wrapedCallable = callableTransform.decorate(runnable);
                    } else {
                        //callableTransform accepts Callable, but we have Runnable
                        throw new IllegalStateException("You use CallableDecorator, but execute Runnable");
                    }

                    @SuppressWarnings("unchecked")
                    Callable<T> param = (Callable<T>)wrapedCallable;
                    return super.newTaskFor(param);
                }


            };

        } else {
            executor = new ThreadPoolExecutor(
                    this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
                    queue, threadFactory, rejectedExecutionHandler);

        }

        if (this.allowCoreThreadTimeOut) {
            executor.allowCoreThreadTimeOut(true);
        }

        this.threadPoolExecutor = executor;
        return executor;
    }


    protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
        if (queueCapacity > 0) {
            return new LinkedBlockingQueue<>(queueCapacity);
        }
        else {
            return new SynchronousQueue<>();
        }
    }

    public ThreadPoolExecutor getThreadPoolExecutor() throws IllegalStateException {
        Assert.state(this.threadPoolExecutor != null, "ThreadPoolTaskExecutor not initialized");
        return this.threadPoolExecutor;
    }

    public int getPoolSize() {
        if (this.threadPoolExecutor == null) {
            // Not initialized yet: assume core pool size.
            return this.corePoolSize;
        }
        return this.threadPoolExecutor.getPoolSize();
    }

    public int getActiveCount() {
        if (this.threadPoolExecutor == null) {
            // Not initialized yet: assume no active threads.
            return 0;
        }
        return this.threadPoolExecutor.getActiveCount();
    }


    @FunctionalInterface
    public interface CallableDecorator {
        <V> Callable<V> decorate(Callable<V> task);
    }

    @FunctionalInterface
    static interface CallableTransform {
        Callable<?> decorate(Object originalTask);

        default boolean isCallable(){
            return true;
        }

        default boolean isRunnable(){
            return !isCallable();
        }
    }



    //rest of the code execute/submit override
    //...

    @Override
    public boolean prefersShortLivedTasks() {
        return true;
    }

}

And usage example is as foolows:

    ThreadPoolTaskExecutor threadPoolFactory = new ThreadPoolTaskExecutor();
    threadPoolFactory.setCorePoolSize(4);
    threadPoolFactory.setMaxPoolSize(4);
    threadPoolFactory.setKeepAliveSeconds(0);


    CallableDecorator decorator = new CallableDecorator(){

        @Override
        public <T> Callable<T> decorate(Callable<T> task) {
            return () -> {
                try {
                    return task.call();
                }
                catch (Throwable e) {
                    synchronized (executor) {
                        if (!((MyRunnable) task).failSilent){   //note use of state of original Task
                            log.error("Execution Failure!", e);
                        }
                    }
                    throw e;
                }
            };
        }
    };
    threadPoolFactory.setCallableDecorator(decorator);

    threadPoolFactory.initialize();
    executor = threadPoolFactory.getThreadPoolExecutor();

and further:

    executor.submit(new MyCallable(true));

Upvotes: 2

Gray
Gray

Reputation: 116918

Basically, there is couple different strategies with exception handling when you use ThreadPoolExecutor:

Although you could override ThreadPoolExecutor.beforeExecute(...), dig out your runnable which is in there via reflection, set a ThreadLocal and then use it in afterExecute(...) this really feels like a hack and is very dependent on the TPE implementation.

I would instead wrap your Runnable or Callable methods in a try/catch log error wrapper. So that you would add things to the thread pool with something like:

threadPool.submit(new RunnableWrapper(myRunnable));
// or
threadPool.submit(new CallableWrapper(myCallable));

These would have the try/catch/log mechanism and also have access to the Runnable for state evaluation. Anything else seems like a hack to me.

You could certainly override the submit(...) methods to do the wrapping of the jobs yourself. This seems a lot cleaner.

Upvotes: 1

Related Questions