Reputation: 46422
I've got a bunch of repeating tasks to schedule. They query the database to find out what to do and then execute some action like statistics updates, sending emails, fetching files and importing them. Currently, there are maybe ten of them and this number it's expected to grow a lot. I'm not given any timing constraints, actually, it's my job to choose an algorithm so that nobody complains. :D
Currently, I'm using an ad-hoc combination of threads and periodically scheduled tasks like
It seems to work well at the moment, but it's not future-proof and it doesn't feel right for these reasons:
(*) It's primarily a web server and serving requests is actually the highest priority. Getting a separate server wouldn't help, as the bottleneck is usually the database. Currently, it works fine, but I'm looking for a better solution as we hope that the load grows by a factor of 100 in a year or two.
My idea is to increase the priority of a job, when it was delayed too much. For example, there are statistics running hourly and delaying them by a few hours is no big deal, but it shouldn't be a whole day and it mustn't be a whole week.
I'd be happy to replace all my AbstractExecutionThreadService
s and AbstractScheduledService
s by something working like follows:
This surely sounds pretty fuzzy and getting it more precise is a part of what I'm asking. My competing goals are
There are no hard deadlines and there's no need to minimize the number of threads used. I don't insist on a solution doing exactly what I described, I'm not looking for a library (nor I insist on reinventing the wheel). I don't think that a cron-like scheduler is the right solution.
Upvotes: 5
Views: 251
Reputation: 10931
Working with the ExecutorService
model, the classic solution to reordering executor tasks is to create a ThreadPoolExecutor
with a PriorityBlockingQueue
feeding it the tasks - as described here.
However needing to schedule the tasks as well puts a spin on it. ScheduledThreadPoolExecutor
uses an internal custom BlockingQueue
to feed the tasks in when the schedule is ready, but as I think you're well aware, it's not easily open to further customisation.
At a glance, DelayQueue
looks like it fits the bill perfectly - it can prioritise the next Delayed
element or task. And this handles a late decision by Delayed.getDelay()
about whether it is ready to go.
The fly in the ointment with this plan is when you try to pass something like DelayQueue<DelayedRunnable>
into the constructor of ThreadPoolExecutor
. This will only accept a BlockingQueue<Runnable>
, not BlockingQueue<? extends Runnable>
.
One way out of this is to create a minimum implementation of BlockingQueue<Runnable>
that delegates to a BlockingQueue
. The basics are here:
public class BlockingDelayQueue extends AbstractQueue<Runnable>
implements BlockingQueue<Runnable> {
private final DelayQueue<DelayedRunnable> delayQueue;
public BlockingDelayQueue(DelayQueue<DelayedRunnable> delayQueue) {
this.delayQueue = delayQueue;
}
@Override
public boolean isEmpty() {
return delayQueue.isEmpty();
}
@Override
public Runnable poll(long timeout, TimeUnit unit)
throws InterruptedException {
DelayedRunnable delayedRunnable = delayQueue.poll(timeout, unit);
if (delayedRunnable == null)
return null;
return delayedRunnable.getCommand();
}
...
}
The experimental version of DelayedRunnable
used to prove the idea there uses a simple Priority
enum that checks the 'busyness' of the executor:
LOW {
boolean isReady(ThreadPoolExecutor executor) {
return executor.getActiveCount() == 0;
}
},
MEDIUM {
boolean isReady(ThreadPoolExecutor executor) {
return executor.getActiveCount() <= 1;
}
},
HIGH {
boolean isReady(ThreadPoolExecutor executor) {
return true;
}
};
Which DelayedRunnable.getDelay()
can then check:
@Override
public long getDelay(TimeUnit unit) {
long millis;
if (!priority.isReady(executor))
millis = 1000;
else
millis = time - System.currentTimeMillis();
return unit.convert(millis, TimeUnit.MILLISECONDS);
}
- so long as it doesn't return <= 0
if the priority
isn't ready yet.
This seemed to work well, e.g. launching a standard 2s sleep task here...
DelayedScheduler scheduler = new DelayedScheduler();
scheduler.schedule(task("Low 1"), 1, TimeUnit.SECONDS, Priority.LOW);
scheduler.schedule(task("Low 2"), 2, TimeUnit.SECONDS, Priority.LOW);
scheduler.schedule(task("Low 3"), 3, TimeUnit.SECONDS, Priority.LOW);
scheduler.schedule(task("Medium 1"), 1, TimeUnit.SECONDS, Priority.MEDIUM);
scheduler.schedule(task("Medium 2"), 2, TimeUnit.SECONDS, Priority.MEDIUM);
scheduler.schedule(task("Medium 3"), 3, TimeUnit.SECONDS, Priority.MEDIUM);
scheduler.schedule(task("High 1"), 1, TimeUnit.SECONDS, Priority.HIGH);
scheduler.schedule(task("High 2"), 2, TimeUnit.SECONDS, Priority.HIGH);
scheduler.schedule(task("High 3"), 3, TimeUnit.SECONDS, Priority.HIGH);
... produced about the right results:
High 1 started at 1087ms
Medium 1 started at 1087ms
High 2 started at 2087ms
Medium 1 ended at 3087ms
High 1 ended at 3087ms
High 3 started at 3087ms
High 2 ended at 4088ms
Medium 2 started at 4088ms
High 3 ended at 5088ms
Medium 3 started at 5088ms
Medium 2 ended at 6088ms
Medium 3 ended at 7089ms
Low 1 started at 7089ms
Low 1 ended at 9089ms
Low 2 started at 9089ms
Low 2 ended at 11089ms
Low 3 started at 11089ms
Low 3 ended at 13089ms
- Medium priority tasks were allowed while there was only one High priority task running, Low while there was nothing else going.
(DelayedScheduler
and the other unseen bits on GitHub).
Upvotes: 1
Reputation: 1046
I think your pretty close to what you want, maybe a little encouragement/approval/aggreement is all that's needed
My thoughts would be "If I know the max number of concurrent threads I can run then how will I share those against 3 thread queues".
Once I know this I can setup 3 queues, each with a different share of the available threads. - Priority 1 (Highest) gets 50% of work - Priority 2 gets 35% of work - Priority 3 (Lowest) gets 15% of work
Upvotes: 0