Reputation: 1483
I have a ThreadPoolExecutor
as such
ThreadPoolExecutor executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>());
The tasks are executed as follows
executor.execute(task)
Now each task may also execute more tasks to the same executor and those new tasks can submit more tasks
The problem is I want the main thread to wait until all tasks are executed and then call shutdown
Is the following approach guaranteed to work? (i.e. block/wait main thread until all tasks are completed)
while (executor.getCompletedTaskCount() < executor.getTaskCount()) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
LOGGER.error("Exception in atomic Count wait thread sleep", e);
break;
}
}
}
Will this eventually break out of loop? Just by preliminary testing, I found that it works even with exceptions in thread
P.S. I cannot use latch, because I don't know the number of tasks beforehand nor the accepted answer here
Upvotes: 1
Views: 876
Reputation: 12346
You should probably keep the futures that get submitted.
Deque<Future<?>> futures = new ConcurrentLinkedDeque<>();
Then everytime you submit a task.
futures.add(executor.submit( runnable, "Doesn't Really Matter, but Can be Useful"));
Then in your main thread that is waiting.
while(futures.size()>0){
futures.pop().get();
}
This will offer you a guarantee that .get
will not complete until a task has finished, and if more tasks are added by another task then futures will reflect the change before the original task completes.
Upvotes: 2
Reputation: 925
The methods used to get the completed count and submitted count i.e. executor.getCompletedTaskCount() & executor.getTaskCount()
do not always provide a 100% accurate count as per the Java (8) docs, so the approach may not work always.
public long getTaskCount()
Returns the approximate total number of tasks that have ever been scheduled for execution. Because the states of tasks and threads may change dynamically during computation, the returned value is only an approximation.
public long getCompletedTaskCount()
Returns the approximate total number of tasks that have completed execution. Because the states of tasks and threads may change dynamically during computation, the returned value is only an approximation, but one that does not ever decrease across successive calls.
Upvotes: 0
Reputation: 21
In my opinion it will be non-deterministic to get the actual count of tasks for the reason that while the tasks are submitted the execute method is called and one of below 3 conditions may happen. 1. Task starts executing (added to Workers) 2. Task is enqueued (added to WorkQueue) 3. Task is rejected as WorkerQueue capacity,Workers capacity and resources exhaust
/**
* Executes the given task sometime in the future. The task
* may execute in a new thread or in an existing pooled thread.
*
* If the task cannot be submitted for execution, either because this
* executor has been shutdown or because its capacity has been reached,
* the task is handled by the current {@code RejectedExecutionHandler}.
*
* @param command the task to execute
* @throws RejectedExecutionException at discretion of
* {@code RejectedExecutionHandler}, if the task
* cannot be accepted for execution
* @throws NullPointerException if {@code command} is null
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
getTaskCount() and getCompletedTaskCount() methods are guarded by mainLock hence we do know if internal threads still submitting tasks to executor will be done by the time check (while (executor.getCompletedTaskCount() < executor.getTaskCount())
) in main executes. This condition may result is false positive for a moment ending into a wrong result.
/**
* Returns the approximate total number of tasks that have ever been
* scheduled for execution. Because the states of tasks and
* threads may change dynamically during computation, the returned
* value is only an approximation.
*
* @return the number of tasks
*/
public long getTaskCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
long n = completedTaskCount;
for (Worker w : workers) {
n += w.completedTasks;
if (w.isLocked())
++n;
}
return n + workQueue.size();
} finally {
mainLock.unlock();
}
}
/**
* Returns the approximate total number of tasks that have
* completed execution. Because the states of tasks and threads
* may change dynamically during computation, the returned value
* is only an approximation, but one that does not ever decrease
* across successive calls.
*
* @return the number of tasks
*/
public long getCompletedTaskCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
long n = completedTaskCount;
for (Worker w : workers)
n += w.completedTasks;
return n;
} finally {
mainLock.unlock();
}
}
**Code Snippets used here are from JDK 1.8 222
Upvotes: 1