Reputation: 291
Problem statement:
I have 1,000 tasks and need to process them via ThreadPoolTaskExecutor. ThreadPoolTaskExecutor has corePoolSize = 5
, maxPoolSize = 10
and queueCapacity = 1000
.
Now from the main method, I am executing the following code
CountDownLatch latch = new CountDownLatch(5);
Collection<Future<?>> futures = new LinkedList<Future<?>>();
for (Map.Entry<String,Boolean> entry : map.entrySet()){
FutureTask task = new FutureTask(new CustomTask(entry));
executor.execute(task);
}
log.info("ACTIVE COUNT : "+executor.getActiveCount());
log.info("SIZE of the QUEUE : "+executor.getThreadPoolExecutor().getQueue().size());
log.info("LATCH WAIT : "+latch.getCount());
latch.wait();
.....
@Override
public Object call() throws Exception {
latch.countDown();
//some logic
return entry;
}
Now, the map has 1,000 entries in it and I want to process all tasks in queue(1,000) and then print these log lines. Whats happening here is, the corePoolSize(which is equal to the CountDounLatch count
) create this number of thread and executes them 'Right-Away'. However, when this number is hit, it starts filling up the queue(which is totally fine and desired). However, this queue tasks are processed ONLY AFTER the main thread reaches the end, only then these tasks start executing. This is something that I don't want. I want the Executor to start picking up items from queue as soon as threads get free from processing batch-1.
But in my case, once the batch-1 is processed, the next task is picked only when the main threads ends(which I do not want).
Anyone with a solution on how can this be achieved? (The processing of queue as soon as the thread is available for processing)
P.S : I do understand that latch.await() waits for the threads to complete their execution, but I am looking for a behavior in which it should wait for all the threads to be finished(which is happening) and all the queue should be empty(my expectations).
Thank You
Upvotes: 0
Views: 703
Reputation: 718986
If you are going to do it this way, you need to initialize the latch with the number of tasks that you are going to submit; i.e. 1,000. Also you should decrement the latch at the end of each task, not at its start (as your code currently seems to be doing.)
But you don't need a latch or a counter or anything to implement this. Instead, if you are using a Java SE ExecutorService
directly, just do this:
public static void main(String[] args) {
// Submit lots of tasks
executorService.shutdown();
try {
// Waits until all tasks in the queue have completed
executorService.awaitTermination(1_000_000, TimeUnit.SECONDS);
} catch (InterruptedException ex) {
// OK ... will end now
}
}
And if you are using the SpringFramework specific ThreadPoolTaskExecutor
class:
public static void main(String[] args) {
// Submit lots of tasks
executor.setAwaitTerminationSeconds(1_000_000);
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.shutdown();
}
Upvotes: 2