Gandalf
Gandalf

Reputation: 9855

Override execute() in ThreadPoolExecutor

I'm looking for a ThreadPoolExecutor that will block when it's task queue is full - the current Java implementation rejects new tasks if the underlying queue is full -

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

would changing this line:

 if (isRunning(c) && workQueue.offer(command)) {

TO

 if (isRunning(c) && workQueue.put(command)) {

Do the trick? Am I missing something?

SOLUTION (might help the next person):

public class BlockingThreadPoolExecutor extends ThreadPoolExecutor {

    private final Semaphore runLock;

    public BlockingThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
            long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, int maxTasks) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        runLock = new Semaphore(maxTasks);
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        runLock.acquireUninterruptibly();
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        runLock.release();
    }

}

Upvotes: 1

Views: 915

Answers (1)

jmehrens
jmehrens

Reputation: 11065

Depends on the ThreadPoolExecutor state and settings because not all task submissions pass through the BlockingQueue. Usually you just want to change the RejectedExecutionHandler of the ThreadPoolExecutor to the ThreadPoolExecutor.CallerRunsPolicy which will throttle submissions. If you really want to block on submit then you should use a CompletionService and call the 'take' method when you want to block. You can also create a subclass and use a Semaphore to block the execute method. See JDK-6648211 : Need for blocking ThreadPoolExecutor for more information.

Upvotes: 4

Related Questions