Rajesh Jose
Rajesh Jose

Reputation: 342

Threadpool executor with priority tasks and avoid starvation

For my use case, I need an executor which can execute tasks based on priority. Simple way to achieve this is by using a thread pool with PriorityBlockingQueue and override newTaskFor() to return custom future tasks that are comparable based on priority of the task.

//Define priorities
public enum Priority {
    HIGH, MEDIUM, LOW, VERYLOW;
}

A priority task

//A Callable tasks that has priority. Concrete implementation will implement 
//call() to do actual work and getPriority() to return priority
public abstract class PriorityTask<V> implements Callable<V> {
    public abstract Priority getPriority ();
}

Actual executor implementation

public class PriorityTaskThreadPoolExecutor <V> {
    int _poolSize;
    private PriorityBlockingQueue<Runnable> _poolQueue = 
                                       new PriorityBlockingQueue<Runnable>(500); 
    private ThreadPoolExecutor _pool;

    public PriorityTaskThreadPoolExecutor (int poolSize) {
        _poolSize = poolSize;

        _pool = new ThreadPoolExecutor(_poolSize, _poolSize, 5, TimeUnit.MINUTES, 
                                      _poolQueue) {
                        //Override newTaskFor() to return wrap PriorityTask 
                        //with a PriorityFutureTaskWrapper.
                        @Override
                        protected <V> RunnableFuture<V> newTaskFor(Callable<V> c) {
                            return new PriorityFutureTaskWrapper<V>((PriorityTask<V>) c);
                        }
                };

        _pool.allowCoreThreadTimeOut(true);
    }

    public Future<V> submit (PriorityTask<V> task) {
        return _pool.submit(task);
    }

}

//A future task that wraps around the priority task to be used in the queue
class PriorityFutureTaskWrapper<V> extends FutureTask<V> 
                             implements Comparable <PriorityFutureTaskWrapper<V>> {
    PriorityTask<V> _priorityTask;

    public PriorityFutureTaskWrapper (PriorityTask<V> priorityTask) {
        super(priorityTask);
        _priorityTask = priorityTask;
    }

    public PriorityTask<V> getPriorityTask () {
        return _priorityTask;
    }

    @Override
    public int compareTo(PriorityFutureTaskWrapper<V> o) {
        return _priorityTask.getPriority().ordinal() - 
               o.getPriorityTask().getPriority().ordinal();
    }
}

Problem with this is, in my usecase, there is a potential that low priority tasks may starve forever. I want to avoid this. I could not find a clean way to do this using the executors/pools available in java. So I am thinking of writing my own executor. I have two different approaches.

1) A custom thread pool with PriorityBlockingQueue. There will be a separate thread, checking the tasks age in the queue. Older tasks will be removed and re-added with escalated priority.

2) My use case will have only a limited number of priorities say 1-4. I will have 4 different queues for each priority. Now the threads in the custom pool, instead of blocking on the queues, will scan the queues in the following order when it has to pick up the next task.

40% threads - Q1, Q2, Q3, Q4

30% threads - Q2, Q1, Q3, Q4

20% threads - Q3, Q1, Q2, Q4

10% threads - Q4, Q1, Q2, Q3

Scanning will be done by a thread when it is notified about a new addition in the queue or when the current task executed by that thread is completed. Other times, threads will be waiting. But still, scanning will be little more inefficient compared to blocking on the queue.

Apprach 2 is more suitable for my usecase.

Has any one tried any of these approaches or a different approach for similar usecase? Any thought/suggestions?

Upvotes: 1

Views: 3561

Answers (1)

ravthiru
ravthiru

Reputation: 9633

There is no easy way to change priority of an already inserted element in the PriorityQueue , It is already discussed here

Your second option should be simple to implement , like process more tasks from high priority queue than from low priority queue

You can also consider having different ThreadPools for each priority and number of threads in each pool depends on priority of the tasks.

Upvotes: 1

Related Questions