ghempton
ghempton

Reputation: 7947

ThreadPoolExecutor Block When its Queue Is Full?

I am trying to execute lots of tasks using a ThreadPoolExecutor. Below is a hypothetical example:

def workQueue = new ArrayBlockingQueue<Runnable>(3, false)
def threadPoolExecutor = new ThreadPoolExecutor(3, 3, 1L, TimeUnit.HOURS, workQueue)
for(int i = 0; i < 100000; i++)
    threadPoolExecutor.execute(runnable)

The problem is that I quickly get a java.util.concurrent.RejectedExecutionException since the number of tasks exceeds the size of the work queue. However, the desired behavior I am looking for is to have the main thread block until there is room in the queue. What is the best way to accomplish this?

Upvotes: 85

Views: 74393

Answers (11)

Eltan Hajiyev
Eltan Hajiyev

Reputation: 455

We already have solution in spring-integration called CallerBlocksPolicy

Code Example:

var executor = new ThreadPoolExecutor(2, 5, 1, TimeUnit.SECONDS, blockingQueue, new CallerBlocksPolicy(99999));

Upvotes: 0

Devesh Singh
Devesh Singh

Reputation: 1

You can imlement RejectedTaskHandler and get all the rejected tasks when Queue size if full. By default executors have the Abort policy so you can add these task back to the queue from handler or whatever the choice is.

public class ExecutorRejectedTaskHandlerFixedThreadPool {
public static void main(String[] args) throws InterruptedException {

    //maximum queue size : 2
    BlockingQueue<Runnable> blockingQueue =
            new LinkedBlockingQueue<Runnable>(2);


    CustomThreadPoolExecutor executor =
            new CustomThreadPoolExecutor(4, 5, 5, TimeUnit.SECONDS,
                    blockingQueue);

    RejectedTaskHandler rejectedHandler = new RejectedTaskHandler();
    executor.setRejectedExecutionHandler(rejectedHandler);
    //submit 20 the tasks for execution
    //Note: only 7 tasks(5-max pool size + 2-queue size) will be executed and rest will be rejected as queue will be overflowed
    for (int i = 0; i < 20; i++) {
        executor.execute(new Task());
    }
    System.out.println("Thread name " + Thread.currentThread().getName());


}

static class Task implements Runnable {
    @Override
    public void run() {
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Thread - " + Thread.currentThread().getName() + " performing it's job");
    }
}


static class RejectedTaskHandler implements RejectedExecutionHandler {

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        System.out.println("Task rejected" + r.toString());

    }
}


public static class CustomThreadPoolExecutor extends ThreadPoolExecutor {

    public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
                                    long keepAliveTime, TimeUnit unit,
                                    BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);

    }
}

}

Upvotes: 0

Adrian
Adrian

Reputation: 3721

One could overwrite ThreadPoolExecutor.execute(command) to use a Semaphore, e.g.:

/**
 * The setup answering the question needs to have:
 *
 * permits = 3
 * corePoolSize = permits (i.e. 3)
 * maximumPoolSize = corePoolSize (i.e. 3)
 * workQueue = anything different to null
 *
 * With this setup workQueue won’t actually be used but only 
 * to check if it’s empty, which should always be the case.
 * Any more than permits as value for maximumPoolSize will have 
 * no effect because at any moment no more than permits calls to 
 * super.execute() will be allowed by the semaphore.
 */
public class ExecutionBlockingThreadPool extends ThreadPoolExecutor {
  private final Semaphore semaphore;

  // constructor setting super(…) parameters and initializing semaphore
  //
  // Below is a bare minimum constructor; using
  // corePoolSize = maximumPoolSize = permits 
  // allows one to use SynchronousQueue because I expect
  // none other that isEmpty() to be called on it; it also
  // allows for using 0L SECONDS because no more than 
  // corePoolSize threads should be attempted to create.
  public ExecutionBlockingThreadPool(int corePoolSize) {
    super(corePoolSize, corePoolSize, 0L, SECONDS, new SynchronousQueue<Runnable>());
    semaphore = new Semaphore(corePoolSize, true);
  }

  public void execute(Runnable command) {
    semaphore.acquire();
    super.execute(() -> {
      try {
        command.run();
      } finally {
        semaphore.release();
      }
    }
  }
}

Upvotes: 0

Milo van der Zee
Milo van der Zee

Reputation: 1050

Ok, old thread but this is what I found when searching for blocking thread executor. My code tries to get a semaphore when the task is submitted to the task queue. This blocks if there are no semaphores left. As soon as a task is done the semaphore is released with the decorator. Scary part is that there is a possibility of losing semaphore but that could be solved with for example a timed job that just clears semaphores on a timed basis.

So here my solution:

class BlockingThreadPoolTaskExecutor(concurrency: Int) : ThreadPoolTaskExecutor() {
    companion object {
        lateinit var semaphore: Semaphore
    }

    init {
        semaphore = Semaphore(concurrency)
        val semaphoreTaskDecorator = SemaphoreTaskDecorator()
        this.setTaskDecorator(semaphoreTaskDecorator)
    }

    override fun <T> submit(task: Callable<T>): Future<T> {
        log.debug("submit")
        semaphore.acquire()
        return super.submit(task)
    }
}

private class SemaphoreTaskDecorator : TaskDecorator {
    override fun decorate(runnable: Runnable): Runnable {
        log.debug("decorate")
        return Runnable {
            try {
                runnable.run()
            } finally {
                log.debug("decorate done")
                semaphore.release()
            }
        }
    }
}

Upvotes: 0

Thomm
Thomm

Reputation: 516

A fairly straightforward option is to wrap your BlockingQueue with an implementation that calls put(..) when offer(..) is being invoked:

public class BlockOnOfferAdapter<T> implements BlockingQueue<T> {

(..)

  public boolean offer(E o) {
        try {
            delegate.put(o);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return false;
        }
        return true;
  }

(.. implement all other methods simply by delegating ..)

}

This works because by default put(..) waits until there is capacity in the queue when it is full, see:

    /**
     * Inserts the specified element into this queue, waiting if necessary
     * for space to become available.
     *
     * @param e the element to add
     * @throws InterruptedException if interrupted while waiting
     * @throws ClassCastException if the class of the specified element
     *         prevents it from being added to this queue
     * @throws NullPointerException if the specified element is null
     * @throws IllegalArgumentException if some property of the specified
     *         element prevents it from being added to this queue
     */
    void put(E e) throws InterruptedException;

No catching of RejectedExecutionException or complicated locking necessary.

Upvotes: 2

disrupt
disrupt

Reputation: 431

This is what I ended up doing:

int NUM_THREADS = 6;
Semaphore lock = new Semaphore(NUM_THREADS);
ExecutorService pool = Executors.newCachedThreadPool();

for (int i = 0; i < 100000; i++) {
    try {
        lock.acquire();
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
    pool.execute(() -> {
        try {
            // Task logic
        } finally {
            lock.release();
        }
    });
}

Upvotes: 9

Konstantin Abakumov
Konstantin Abakumov

Reputation: 380

What you need to do is to wrap your ThreadPoolExecutor into Executor which explicitly limits amount of concurrently executed operations inside it:

 private static class BlockingExecutor implements Executor {

    final Semaphore semaphore;
    final Executor delegate;

    private BlockingExecutor(final int concurrentTasksLimit, final Executor delegate) {
        semaphore = new Semaphore(concurrentTasksLimit);
        this.delegate = delegate;
    }

    @Override
    public void execute(final Runnable command) {
        try {
            semaphore.acquire();
        } catch (InterruptedException e) {
            e.printStackTrace();
            return;
        }

        final Runnable wrapped = () -> {
            try {
                command.run();
            } finally {
                semaphore.release();
            }
        };

        delegate.execute(wrapped);

    }
}

You can adjust concurrentTasksLimit to the threadPoolSize + queueSize of your delegate executor and it will pretty much solve your problem

Upvotes: 20

devkaoru
devkaoru

Reputation: 1162

You could use a semaphore to block threads from going into the pool.

ExecutorService service = new ThreadPoolExecutor(
    3, 
    3, 
    1, 
    TimeUnit.HOURS, 
    new ArrayBlockingQueue<>(6, false)
);

Semaphore lock = new Semaphore(6); // equal to queue capacity

for (int i = 0; i < 100000; i++ ) {
    try {
        lock.acquire();
        service.submit(() -> {
            try {
              task.run();
            } finally {
              lock.release();
            }
        });
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
}

Some gotchas:

  • Only use this pattern with a fixed thread pool. The queue is unlikely to be full often, thus new threads won't be created. Check out the java docs on ThreadPoolExecutor for more details: https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ThreadPoolExecutor.html There is a way around this, but it is out of scope of this answer.
  • Queue size should be higher than the number of core threads. If we were to make the queue size 3, what would end up happening is:

    • T0: all three threads are doing work, the queue is empty, no permits are available.
    • T1: Thread 1 finishes, releases a permit.
    • T2: Thread 1 polls the queue for new work, finds none, and waits.
    • T3: Main thread submits work into the pool, thread 1 starts work.

    The example above translates to thread the main thread blocking thread 1. It may seem like a small period, but now multiply the frequency by days and months. All of a sudden, short periods of time add up to a large amount of time wasted.

Upvotes: 9

TinkerTank
TinkerTank

Reputation: 5805

I solved this problem using a custom RejectedExecutionHandler, which simply blocks the calling thread for a little while and then tries to submit the task again:

public class BlockWhenQueueFull implements RejectedExecutionHandler {

    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {

        // The pool is full. Wait, then try again.
        try {
            long waitMs = 250;
            Thread.sleep(waitMs);
        } catch (InterruptedException interruptedException) {}

        executor.execute(r);
    }
}

This class can just be used in the thread-pool executor as a RejectedExecutionHandler like any other. In this example:

executorPool = new def threadPoolExecutor = new ThreadPoolExecutor(3, 3, 1L, TimeUnit.HOURS, workQueue, new BlockWhenQueueFull())

The only downside I see is that the calling thread might get locked slightly longer than strictly necessary (up to 250ms). For many short-running tasks, perhaps decrease the wait-time to 10ms or so. Moreover, since this executor is effectively being called recursively, very long waits for a thread to become available (hours) might result in a stack overflow.

Nevertheless, I personally like this method. It's compact, easy to understand, and works well. Am I missing anything important?

Upvotes: 0

MoeHoss
MoeHoss

Reputation: 58

Here is my code snippet in this case:

public void executeBlocking( Runnable command ) {
    if ( threadPool == null ) {
        logger.error( "Thread pool '{}' not initialized.", threadPoolName );
        return;
    }
    ThreadPool threadPoolMonitor = this;
    boolean accepted = false;
    do {
        try {
            threadPool.execute( new Runnable() {
                @Override
                public void run() {
                    try {
                        command.run();
                    }
                    // to make sure that the monitor is freed on exit
                    finally {
                        // Notify all the threads waiting for the resource, if any.
                        synchronized ( threadPoolMonitor ) {
                            threadPoolMonitor.notifyAll();
                        }
                    }
                }
            } );
            accepted = true;
        }
        catch ( RejectedExecutionException e ) {
            // Thread pool is full
            try {
                // Block until one of the threads finishes its job and exits.
                synchronized ( threadPoolMonitor ) {
                    threadPoolMonitor.wait();
                }
            }
            catch ( InterruptedException ignored ) {
                // return immediately
                break;
            }
        }
    } while ( !accepted );
}

threadPool is a local instance of java.util.concurrent.ExecutorService which has been initialized already.

Upvotes: 1

Darren Gilroy
Darren Gilroy

Reputation: 2121

In some very narrow circumstances, you can implement a java.util.concurrent.RejectedExecutionHandler that does what you need.

RejectedExecutionHandler block = new RejectedExecutionHandler() {
  rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
     executor.getQueue().put( r );
  }
};

ThreadPoolExecutor pool = new ...
pool.setRejectedExecutionHandler(block);

Now. This is a very bad idea for the following reasons

  • It's prone to deadlock because all the threads in the pool may die before the thing you put in the queue is visible. Mitigate this by setting a reasonable keep alive time.
  • The task is not wrapped the way your Executor may expect. Lots of executor implementations wrap their tasks in some sort of tracking object before execution. Look at the source of yours.
  • Adding via getQueue() is strongly discouraged by the API, and may be prohibited at some point.

A almost-always-better strategy is to install ThreadPoolExecutor.CallerRunsPolicy which will throttle your app by running the task on the thread which is calling execute().

However, sometimes a blocking strategy, with all its inherent risks, is really what you want. I'd say under these conditions

  • You only have one thread calling execute()
  • You have to (or want to) have a very small queue length
  • You absolutely need to limit the number of threads running this work (usually for external reasons), and a caller-runs strategy would break that.
  • Your tasks are of unpredictable size, so caller-runs could introduce starvation if the pool was momentarily busy with 4 short tasks and your one thread calling execute got stuck with a big one.

So, as I say. It's rarely needed and can be dangerous, but there you go.

Good Luck.

Upvotes: 85

Related Questions