Reputation: 173
I implemented the Spring-TaskExecutor (which is the equivalent of the JDK 1.5's Executor.) to process notifications notifications receiving from external systems.
Interface with only one method:
public interface AsynchronousService {
void executeAsynchronously(Runnable task);
}
and the corresponding implementation:
public class AsynchronousServiceImpl implements AsynchronousService {
private TaskExecutor taskExecutor;
@Override
public void executeAsynchronously(Runnable task) {
taskExecutor.execute(task);
}
@Required
public void setTaskExecutor(TaskExecutor taskExecutor) {
this.taskExecutor = taskExecutor;
}
}
Xml-configuration of the task executor (legacy application):
<bean id="taskExecutor" class="org.example.impl.NotificationPool">
<property name="corePoolSize" value="1"/>
<property name="maxPoolSize" value="1"/>
<property name="queueCapacity" value="100"/>
<property name="WaitForTasksToCompleteOnShutdown" value="true"/>
</bean>
1 is set for both, corePoolSize and maxPoolSize, since I want the tasks to execute sequentially (only 1 thread is created by the pool which process the tasks).
I want to order my task based on the date when i received the notification, therefore I need to override this function to allow priority ordering:
public class NotificationPool extends ThreadPoolTaskExecutor {
@Override
protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
return new PriorityBlockingQueue<>(queueCapacity);
}
}
Here is the Notification task class:
public class NotificationTask implements Runnable, Comparable<NotificationTask> {
private final NotificationService notificationService;
private final Notification notification;
public NotificationService(NotificationService notificationService,
Notification notification) {
this.notificationService = notificationService;
this.notification = notification;
}
@Override
public int compareTo(NotificationTask task) {
return notification.getTimestamp().compareTo(task.getTimestamp());
}
@Override
public void run() {
notificationService.processNotification(notification);
}
}
And this is how I execute it:
asynchronousService.executeAsynchronously(new NotificationTask (notificationService, notification));
Now, what if something goes wrong or how do you even know something gone wrong? For example if one of the tasks throws an exception? How do you handle that? I would like to log if exceptions are thrown.
I found an article (https://ewirch.github.io/2013/12/a-executor-is-not-a-thread.html) which suggests to override afterExecute()
- method of the class: ThreadPoolExecutor
. However, I'm currently using Spring's ThreadPoolTaskExecutor
which doesn't have the beforeExecute()
and afterExecute()
callback method of Java's ThreadPoolExecutor
.
I could extend the ThreadPoolTaskExecutor
and override the initializeExecutor()
method and create an instance of my custom ThreadPoolExecutor
. But the problem is that the initializeExecutor
method uses private fields of the ThreadPoolTaskExecutor
.
Does someone has a better idea or a better approach.
Upvotes: 2
Views: 5016
Reputation: 145
You can use formal Spring Async mechanism.
@EnableAsync
@Configuration
public class AsyncConfiguration implements AsyncConfigurer {
@Bean("taskExecutor")
public ThreadPoolTaskExecutor taskExecutor() {
return new ThreadPoolTaskExecutor();
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new SimpleAsyncUncaughtExceptionHandler(); // U can customize
}
Usage:
public class BlaBla {
@Async("taskExecutor")
public void doSomething() {
...
}
}
Upvotes: 0
Reputation: 145
You can easly use like this.
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor() {
@Override
public void execute(@NotNull Runnable task) {
try {
super.execute(task);
} catch (Throwable e) {
log.error(e.getMessage(), e);
}
}
};
Upvotes: 0
Reputation: 18235
The Spring doc said that
For an alternative, you may set up a ThreadPoolExecutor instance directly using constructor injection, or use a factory method definition that points to the Executors class. To expose such a raw Executor as a Spring TaskExecutor, simply wrap it with a ConcurrentTaskExecutor adapter.
But I didn't see any constructor related to ThreadPoolExecutor
where we can inject, so it's probably a myth or they alrady removed the feature.
Fortunately we have ThreadPoolExecutorFactoryBean
to rescue:
If you prefer native ExecutorService exposure instead, consider ThreadPoolExecutorFactoryBean as an alternative to this class.
This executor expose a method which we can customize the threadpool:
public class NotificationPool extends ThreadPoolExecutorFactoryBean {
@Override
protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
return new PriorityBlockingQueue<>(queueCapacity);
}
@Override
protected ThreadPoolExecutor createExecutor(int corePoolSize, int maxPoolSize, int keepAliveSeconds, BlockingQueue<Runnable> queue, ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
return new YourCustomThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler);
}
}
And override the default afterExecute
callback:
public class YourCustomThreadPoolExecutor extends ThreadPoolExecutor {
public YourCustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
// Here do something with your exception
}
}
Upvotes: 3