Reputation: 31
Put together a simple test program below that supposed to execute some task in parallel. Every time we submit 6 tasks and wait for completion. Then, another set of tasks submitted.
import java.util.concurrent.*;
public class ThreadExecutorTest {
public static void main(String... args) {
ThreadPoolExecutor ex = new ThreadPoolExecutor( 15, 20, 10,
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(5));
for (int i = 0; i< 200; i++) {
submitTasks(ex);
}
System.out.println("Done");
}
private static void submitTasks(ThreadPoolExecutor ex) {
Future f1 = ex.submit( new SampleTask());
Future f2 = ex.submit( new SampleTask());
Future f3 = ex.submit( new SampleTask());
Future f4 = ex.submit( new SampleTask());
Future f5 = ex.submit( new SampleTask());
Future f6 = ex.submit( new SampleTask());
// System.out.println("Max Pool Size " + ex.getMaximumPoolSize());
System.out.println("Pool Size " + ex.getPoolSize());
// System.out.println("Active count " + ex.getActiveCount());
// System.out.println("Task Count " + ex.getTaskCount());
// System.out.println("Queue length " + ex.getQueue().size());
// System.out.println("Queue remainingCapacity " + ((ArrayBlockingQueue)ex.getQueue()).remainingCapacity());
try {
f1.get();
} catch (ExecutionException eex) {
System.out.println("ExecutionException reported later - " + eex.getMessage());
}catch(Exception exp){
System.out.println("Exception reported later - " + exp.getMessage());
}
try{
f2.get();
}catch(Exception exp){}
try{
f3.get();
}catch(Exception exp){}
try{
f4.get();
}catch(Exception exp){}
try{
f5.get();
}catch(Exception exp){}
try{
f6.get();
}catch(Exception exp){}
}
static class SampleTask implements Callable<Void> {
@Override
public Void call() throws Exception {
try {
// Thread.sleep(300);
} catch (Exception e) {
System.out.println("Exception reported");
}
return null;
}
}
}
But, following exception generated that I cannot explain. I assumed that ThreadPoolExecutor configuration is correct to handle 6 tasks at the any moment.
Pool Size 6
Pool Size 12
Pool Size 15
Pool Size 16
Pool Size 17
Pool Size 18
Pool Size 19
Pool Size 20
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@2328c243 rejected from java.util.concurrent.ThreadPoolExecutor@bebdb06[Running, pool size = 20, active threads = 0, queued tasks = 0, completed tasks = 53]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:134)
Upvotes: 3
Views: 1917
Reputation: 6178
ThreadPoolExecutor.execute
has a comment describing how it behaves when a new task has been submitted:
/* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */
In your case, as you submit batches of 6 tasks at a time, when the current size of the pool is less than the core size, these submissions get immediately dispatched to new worker threads (see the jumps from 0 to 6, and from 6 to 12).
Once you've exceeded the core pool size but are still less than the maximum size, the tasks get submitted to the queue and then pulled off asynchronously to run on an existing worker thread, as long as the queue is not full. Since these tasks all get submitted back-to-back, there is a high probability that all six get submitted before any get pulled off of the queue; thus, the first five will get queued and the remaining one will get to step 3 of the process described above: a new worker thread is created and that task is run immediately. (This explains the later jumps from 15 to 16, 16 to 17, and so on.)
Eventually, this results in the thread pool having the maximum number of worker threads, and when Step 3 of the process above is reached (as in the last paragraph), the Executor is unable to create a new worker and rejects the task. In essence, even though there are available worker threads, you haven't given the executor any time to pull tasks off of the queue to execute on them before over-filling the queue.
Upvotes: 2