Norbert94
Norbert94

Reputation: 173

Spring Executorservice Error-Handling

I implemented the Spring-TaskExecutor (which is the equivalent of the JDK 1.5's Executor.) to process notifications notifications receiving from external systems.

Interface with only one method:

 public interface AsynchronousService {
    void executeAsynchronously(Runnable task);
}

and the corresponding implementation:

public class AsynchronousServiceImpl implements AsynchronousService {

    private TaskExecutor taskExecutor;

    @Override
    public void executeAsynchronously(Runnable task) {
        taskExecutor.execute(task);
    }

    @Required
    public void setTaskExecutor(TaskExecutor taskExecutor) {
        this.taskExecutor = taskExecutor;
    }
}

Xml-configuration of the task executor (legacy application):

<bean id="taskExecutor" class="org.example.impl.NotificationPool">
        <property name="corePoolSize" value="1"/>
        <property name="maxPoolSize" value="1"/>
        <property name="queueCapacity" value="100"/>
        <property name="WaitForTasksToCompleteOnShutdown" value="true"/>
</bean>

1 is set for both, corePoolSize and maxPoolSize, since I want the tasks to execute sequentially (only 1 thread is created by the pool which process the tasks).

I want to order my task based on the date when i received the notification, therefore I need to override this function to allow priority ordering:

public class NotificationPool extends ThreadPoolTaskExecutor {
     @Override
     protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
          return new PriorityBlockingQueue<>(queueCapacity);
        }
    }

Here is the Notification task class:

public class NotificationTask implements Runnable, Comparable<NotificationTask> {

    private final NotificationService notificationService;
    private final Notification notification;

    public NotificationService(NotificationService notificationService, 
                               Notification notification) {
        this.notificationService = notificationService;
        this.notification = notification;
    }

    @Override
    public int compareTo(NotificationTask task) {
        return notification.getTimestamp().compareTo(task.getTimestamp());
    }

    @Override
    public void run() {
        notificationService.processNotification(notification);
    }
}

And this is how I execute it:

asynchronousService.executeAsynchronously(new NotificationTask (notificationService, notification));

Now, what if something goes wrong or how do you even know something gone wrong? For example if one of the tasks throws an exception? How do you handle that? I would like to log if exceptions are thrown.

I found an article (https://ewirch.github.io/2013/12/a-executor-is-not-a-thread.html) which suggests to override afterExecute() - method of the class: ThreadPoolExecutor. However, I'm currently using Spring's ThreadPoolTaskExecutor which doesn't have the beforeExecute() and afterExecute() callback method of Java's ThreadPoolExecutor.

I could extend the ThreadPoolTaskExecutor and override the initializeExecutor() method and create an instance of my custom ThreadPoolExecutor. But the problem is that the initializeExecutor method uses private fields of the ThreadPoolTaskExecutor.

Does someone has a better idea or a better approach.

Upvotes: 2

Views: 5016

Answers (3)

Talha Dilber
Talha Dilber

Reputation: 145

You can use formal Spring Async mechanism.

@EnableAsync
@Configuration
public class AsyncConfiguration implements AsyncConfigurer {

   @Bean("taskExecutor")
public ThreadPoolTaskExecutor taskExecutor() {
    return new ThreadPoolTaskExecutor();
}

@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
    return new SimpleAsyncUncaughtExceptionHandler(); // U can customize
}

Usage:

public class BlaBla {

@Async("taskExecutor")
public void doSomething() {
   ...
   }
}

Upvotes: 0

Talha Dilber
Talha Dilber

Reputation: 145

You can easly use like this.

ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor() {
        @Override
        public void execute(@NotNull Runnable task) {
            try {
                super.execute(task);
            } catch (Throwable e) {
                log.error(e.getMessage(), e);
            }
        }
    };

Upvotes: 0

Mạnh Quyết Nguyễn
Mạnh Quyết Nguyễn

Reputation: 18235

The Spring doc said that

For an alternative, you may set up a ThreadPoolExecutor instance directly using constructor injection, or use a factory method definition that points to the Executors class. To expose such a raw Executor as a Spring TaskExecutor, simply wrap it with a ConcurrentTaskExecutor adapter.

But I didn't see any constructor related to ThreadPoolExecutor where we can inject, so it's probably a myth or they alrady removed the feature.

Fortunately we have ThreadPoolExecutorFactoryBean to rescue:

If you prefer native ExecutorService exposure instead, consider ThreadPoolExecutorFactoryBean as an alternative to this class.

This executor expose a method which we can customize the threadpool:

public class NotificationPool extends ThreadPoolExecutorFactoryBean {
     @Override
     protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
          return new PriorityBlockingQueue<>(queueCapacity);
     }

     @Override
     protected ThreadPoolExecutor createExecutor(int corePoolSize, int maxPoolSize, int keepAliveSeconds, BlockingQueue<Runnable> queue, ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
        return new YourCustomThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler);
     }
}

And override the default afterExecute callback:

public class YourCustomThreadPoolExecutor extends ThreadPoolExecutor {

    public YourCustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        // Here do something with your exception
    }
}

Upvotes: 3

Related Questions