Reputation: 691
I have a application where I have multiple threads reading messages from a jms destination. The listener thread reads the message, makes some changes to it and calls several other methods of different classes. These methods are annotated with @Async
annotation that all the methods gets executed in parallel using a custom ThreadPoolTaskExecutor
.
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(corePoolSize);
executor.setMaxPoolSize(maxPoolSize);
executor.setQueueCapacity(queueCapacity);
executor.setKeepAliveSeconds(keepAliveSeconds);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setTaskDecorator(new LoggingTaskDecorator());
executor.initialize();
return executor;
}
Until now all the messages were considered to be of equal priority everything was fine, as all messages were going into LinkedBlockingQueue
if none of the Executor
threads left available.
Now, there comes a requirement where a particular type of message read from the queue is expected to be given higher priority than any other message read from the queue.
Currently, I am using "org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor", which doesn't provide any method where I can set Priority Queue as my Blocking queue implementation.
Could, you please help me solve this scenario? Or is that the existing design of the system could not accommodate this change? Or what could be the best solution to handle such scenarios?
Thanks !
Upvotes: 2
Views: 1987
Reputation: 10218
As far as I am aware, it's not possible to prioritize tasks in combination with @Async
. (The previous answer only shows how to set up a TaskExecutor
with a priority queue, but not how to specify the priorities. On its own, that would result in ClassCastException
because the Runnables
used by Spring for @Async
methods are not comparable.)
Instead, I would use an explicit Executor
:
executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS, new PriorityBlockingQueue<>());
Then, instead of an @Async
method, define a regular method that submits a suitable task to this executor:
var task = new CustomTask(/* stuff needed to run the task and determine the priority */);
executor.execute(task);
// if the method is supposed to return a Future:
return task;
The last piece is a CustomTask
class that implements Runnable
and Comparable
. If the method is supposed to return a Future
, the class should implement RunnableFuture
instead of just Runnable
. A simple implementation could inherit from FutureTask
:
class CustomTask extends FutureTask</* result type */> implements Comparable<CustomTask> {
CustomTask(/* stuff */) {
super(() -> /* do work */);
}
public int compareTo(CustomTask other) {
/* return -1 if this task has a higher priority than the other */
}
}
Upvotes: 0
Reputation: 125212
By simply overriding the createQueue
method. Also you should use an @Bean
method to create an instance of the bean, that way Spring can properly manage the lifecycle, a small but important thing (else shutdown wouldn't work properly)..
@Override
public Executor getAsyncExecutor() {
return taskExecutor();
}
@Bean
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor() {
protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
return new PriorityBlockingQueue<>(queueCapacity);
}
};
executor.setCorePoolSize(corePoolSize);
executor.setMaxPoolSize(maxPoolSize);
executor.setQueueCapacity(queueCapacity);
executor.setKeepAliveSeconds(keepAliveSeconds);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setTaskDecorator(new LoggingTaskDecorator());
return executor;
}
Something like this should work. The createQueue
method now creates a PriorityBlockingQueue
instead of the default LinkedBlockingQueue
.
Upvotes: 0