Reputation: 813
ThreadPoolExecutor doc says
If corePoolSize or more threads are running, the Executor always prefers queuing a request rather than adding a new thread.
If there are more than corePoolSize but less than maximumPoolSize threads running, a new thread will be created only if the queue is full.
Is there a way to get the executor to prefer new thread creation until the max is reached even if there are there are more than core size threads, and then start queuing? Tasks would get rejected if the queue reached its maximum size. It would be nice if the timeout setting would kick in and remove threads down to core size after a busy burst has been handled. I see the reason behind preferring to queue so as to allow for throttling; however, this customization would additionally allow the queue to act mainly as a list of tasks yet to be run.
Upvotes: 6
Views: 963
Reputation: 1
CustomBlockingQueue
package com.gunjan;
import java.util.concurrent.BlockingQueue;
public abstract class CustomBlockingQueue<E> implements BlockingQueue<E> {
public BlockingQueue<E> blockingQueue;
public CustomBlockingQueue(BlockingQueue blockingQueue) {
this.blockingQueue = blockingQueue;
}
@Override
final public boolean offer(E e) {
return false;
}
final public boolean customOffer(E e) {
return blockingQueue.offer(e);
}
}
ThreadPoolBlockingQueue
package com.gunjan;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
public class ThreadPoolBlockingQueue<E> extends CustomBlockingQueue<E> {
public ThreadPoolBlockingQueue(BlockingQueue blockingQueue) {
super(blockingQueue);
}
@Override
public E remove() {
return this.blockingQueue.remove();
}
@Override
public E poll() {
return this.blockingQueue.poll();
}
@Override
public E element() {
return this.blockingQueue.element();
}
@Override
public E peek() {
return this.blockingQueue.peek();
}
@Override
public int size() {
return this.blockingQueue.size();
}
@Override
public boolean isEmpty() {
return this.blockingQueue.isEmpty();
}
@Override
public Iterator<E> iterator() {
return this.blockingQueue.iterator();
}
@Override
public Object[] toArray() {
return this.blockingQueue.toArray();
}
@Override
public <T> T[] toArray(T[] a) {
return this.blockingQueue.toArray(a);
}
@Override
public boolean containsAll(Collection<?> c) {
return this.blockingQueue.containsAll(c);
}
@Override
public boolean addAll(Collection<? extends E> c) {
return this.blockingQueue.addAll(c);
}
@Override
public boolean removeAll(Collection<?> c) {
return this.blockingQueue.removeAll(c);
}
@Override
public boolean retainAll(Collection<?> c) {
return this.blockingQueue.retainAll(c);
}
@Override
public void clear() {
this.blockingQueue.clear();
}
@Override
public boolean add(E e) {
return this.blockingQueue.add(e);
}
@Override
public void put(E e) throws InterruptedException {
this.blockingQueue.put(e);
}
@Override
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
return this.blockingQueue.offer(e, timeout, unit);
}
@Override
public E take() throws InterruptedException {
return this.blockingQueue.take();
}
@Override
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
return this.blockingQueue.poll(timeout, unit);
}
@Override
public int remainingCapacity() {
return this.blockingQueue.remainingCapacity();
}
@Override
public boolean remove(Object o) {
return this.blockingQueue.remove(o);
}
@Override
public boolean contains(Object o) {
return this.blockingQueue.contains(o);
}
@Override
public int drainTo(Collection<? super E> c) {
return this.blockingQueue.drainTo(c);
}
@Override
public int drainTo(Collection<? super E> c, int maxElements) {
return this.blockingQueue.drainTo(c, maxElements);
}
}
RejectedExecutionHandlerImpl
package com.gunjan;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
public class RejectedExecutionHandlerImpl implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
boolean inserted = ((CustomBlockingQueue) executor.getQueue()).customOffer(r);
if (!inserted) {
throw new RejectedExecutionException();
}
}
}
CustomThreadPoolExecutorTest
package com.gunjan;
import java.util.concurrent.*;
public class CustomThreadPoolExecutorTest {
public static void main(String[] args) throws InterruptedException {
LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue<Runnable>(500);
CustomBlockingQueue customLinkedBlockingQueue = new ThreadPoolBlockingQueue<Runnable>(linkedBlockingQueue);
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 100, 60, TimeUnit.SECONDS,
customLinkedBlockingQueue, new RejectedExecutionHandlerImpl());
for (int i = 0; i < 750; i++) {
try {
threadPoolExecutor.submit(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
System.out.println(threadPoolExecutor);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
} catch (RejectedExecutionException e) {
e.printStackTrace();
}
}
threadPoolExecutor.shutdown();
threadPoolExecutor.awaitTermination(Integer.MAX_VALUE, TimeUnit.MINUTES);
System.out.println(threadPoolExecutor);
}
}
Upvotes: 0
Reputation: 11006
No way to get this exact behavior with a ThreadPoolExecutor
.
But, here's a couple solutions:
Consider,
If less than corePoolSize
threads are running, a new thread will be created for every item queued until coorPoolSize
threads are running.
A new thread will only be created if the queue is full, and less than maximumPoolSize
threads are running.
So, wrap a ThreadPoolExecutor in a class which monitors how fast items are being queued. Then, change the core pool size to a higher value when many items are being submitted. This will cause a new thread to be created each time a new item is submitted.
When the submission burst is done, core pool size needs to be manually reduced again so the threads can naturally time out. If you're worried the busy burst could end abruptly, causing the manual method to fail, be sure to use allowCoreThreadTimeout.
Create a fixed thread pool, and allowCoreThreadTimeout
Unfortunately this uses more threads during low submission bursts, and stores no idle threads during zero traffic.
Use the 1st solution if you have the time, need, and inclination as it will handle a wider range of submission frequency and so is a better solution in terms of flexibility.
Otherwise use the 2nd solution.
Upvotes: 6
Reputation: 40396
It seems that your preference is minimal latency during times of low-activity. For that I would just set the corePoolSize to the max and let the extra threads hang around. During high-activity times these threads will be there anyways. During low-activity times their existence won't have that much impact. You can set the core thread timeout if you want them to die though.
That way all the threads will always be available to execute a task as soon as possible.
Upvotes: 1
Reputation: 17319
Just do what Executors.newFixedThreadPool
does and set core
and max
to the same value. Here's the newFixedThreadPool
source from Java 6:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
What you can do if you have an existing one:
ThreadPoolExecutor tpe = ... ;
tpe.setCorePoolSize(tpe.getMaxPoolSize());
Edit: As William points out in the comments, this means that all threads are core threads, so none of the threads will time out and terminate. To change this behavior, just use ThreadPoolExecutor.allowCoreThreadTimeout(true)
. This will make it so that the threads can time out and be swept away when the executor isn't in use.
Upvotes: 2