r-sniper
r-sniper

Reputation: 1483

ThreadPoolExecutor dynamic task execution, wait until all task completion

I have a ThreadPoolExecutor as such

ThreadPoolExecutor executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>());

The tasks are executed as follows

executor.execute(task)

Now each task may also execute more tasks to the same executor and those new tasks can submit more tasks

The problem is I want the main thread to wait until all tasks are executed and then call shutdown

Is the following approach guaranteed to work? (i.e. block/wait main thread until all tasks are completed)

while (executor.getCompletedTaskCount() < executor.getTaskCount()) {
      try {
        Thread.sleep(100);
      } catch (InterruptedException e) {
        LOGGER.error("Exception in atomic Count wait thread sleep", e);
        break;
      }
    }
}

Will this eventually break out of loop? Just by preliminary testing, I found that it works even with exceptions in thread

P.S. I cannot use latch, because I don't know the number of tasks beforehand nor the accepted answer here

Upvotes: 1

Views: 876

Answers (3)

matt
matt

Reputation: 12346

You should probably keep the futures that get submitted.

Deque<Future<?>> futures = new ConcurrentLinkedDeque<>();

Then everytime you submit a task.

futures.add(executor.submit( runnable, "Doesn't Really Matter, but Can be Useful"));

Then in your main thread that is waiting.

while(futures.size()>0){
    futures.pop().get();
}

This will offer you a guarantee that .get will not complete until a task has finished, and if more tasks are added by another task then futures will reflect the change before the original task completes.

Upvotes: 2

A_C
A_C

Reputation: 925

The methods used to get the completed count and submitted count i.e. executor.getCompletedTaskCount() & executor.getTaskCount() do not always provide a 100% accurate count as per the Java (8) docs, so the approach may not work always.

public long getTaskCount()

Returns the approximate total number of tasks that have ever been scheduled for execution. Because the states of tasks and threads may change dynamically during computation, the returned value is only an approximation.

public long getCompletedTaskCount()

Returns the approximate total number of tasks that have completed execution. Because the states of tasks and threads may change dynamically during computation, the returned value is only an approximation, but one that does not ever decrease across successive calls.

Upvotes: 0

MaDsAm
MaDsAm

Reputation: 21

In my opinion it will be non-deterministic to get the actual count of tasks for the reason that while the tasks are submitted the execute method is called and one of below 3 conditions may happen. 1. Task starts executing (added to Workers) 2. Task is enqueued (added to WorkQueue) 3. Task is rejected as WorkerQueue capacity,Workers capacity and resources exhaust

 /**
     * Executes the given task sometime in the future.  The task
     * may execute in a new thread or in an existing pooled thread.
     *
     * If the task cannot be submitted for execution, either because this
     * executor has been shutdown or because its capacity has been reached,
     * the task is handled by the current {@code RejectedExecutionHandler}.
     *
     * @param command the task to execute
     * @throws RejectedExecutionException at discretion of
     *         {@code RejectedExecutionHandler}, if the task
     *         cannot be accepted for execution
     * @throws NullPointerException if {@code command} is null
     */
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

getTaskCount() and getCompletedTaskCount() methods are guarded by mainLock hence we do know if internal threads still submitting tasks to executor will be done by the time check (while (executor.getCompletedTaskCount() < executor.getTaskCount()) ) in main executes. This condition may result is false positive for a moment ending into a wrong result.

/**
     * Returns the approximate total number of tasks that have ever been
     * scheduled for execution. Because the states of tasks and
     * threads may change dynamically during computation, the returned
     * value is only an approximation.
     *
     * @return the number of tasks
     */
    public long getTaskCount() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            long n = completedTaskCount;
            for (Worker w : workers) {
                n += w.completedTasks;
                if (w.isLocked())
                    ++n;
            }
            return n + workQueue.size();
        } finally {
            mainLock.unlock();
        }
    }
    /**
     * Returns the approximate total number of tasks that have
     * completed execution. Because the states of tasks and threads
     * may change dynamically during computation, the returned value
     * is only an approximation, but one that does not ever decrease
     * across successive calls.
     *
     * @return the number of tasks
     */
    public long getCompletedTaskCount() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            long n = completedTaskCount;
            for (Worker w : workers)
                n += w.completedTasks;
            return n;
        } finally {
            mainLock.unlock();
        }
    }

**Code Snippets used here are from JDK 1.8 222

Upvotes: 1

Related Questions