foamroll
foamroll

Reputation: 813

ThreadPoolExecutor's queuing behavior customizable to prefer new thread creation over queuing?

ThreadPoolExecutor doc says

If corePoolSize or more threads are running, the Executor always prefers queuing a request rather than adding a new thread.


If there are more than corePoolSize but less than maximumPoolSize threads running, a new thread will be created only if the queue is full.

Is there a way to get the executor to prefer new thread creation until the max is reached even if there are there are more than core size threads, and then start queuing? Tasks would get rejected if the queue reached its maximum size. It would be nice if the timeout setting would kick in and remove threads down to core size after a busy burst has been handled. I see the reason behind preferring to queue so as to allow for throttling; however, this customization would additionally allow the queue to act mainly as a list of tasks yet to be run.

Upvotes: 6

Views: 963

Answers (4)

Gunjan
Gunjan

Reputation: 1

CustomBlockingQueue

package com.gunjan;

import java.util.concurrent.BlockingQueue;

public abstract class CustomBlockingQueue<E> implements BlockingQueue<E> {

public BlockingQueue<E> blockingQueue;

public CustomBlockingQueue(BlockingQueue blockingQueue) {
    this.blockingQueue = blockingQueue;
}

@Override
final public boolean offer(E e) {
    return false;
}

final public boolean customOffer(E e) {
    return blockingQueue.offer(e);
}
}

ThreadPoolBlockingQueue

package com.gunjan;

import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

public class ThreadPoolBlockingQueue<E> extends CustomBlockingQueue<E> {

    public ThreadPoolBlockingQueue(BlockingQueue blockingQueue) {
        super(blockingQueue);
    }

    @Override
    public E remove() {
        return this.blockingQueue.remove();
    }

    @Override
    public E poll() {
        return this.blockingQueue.poll();
    }

    @Override
    public E element() {
        return this.blockingQueue.element();
    }

    @Override
    public E peek() {
        return this.blockingQueue.peek();
    }

    @Override
    public int size() {
        return this.blockingQueue.size();
    }

    @Override
    public boolean isEmpty() {
        return this.blockingQueue.isEmpty();
    }

    @Override
    public Iterator<E> iterator() {
        return this.blockingQueue.iterator();
    }

    @Override
    public Object[] toArray() {
        return this.blockingQueue.toArray();
    }

    @Override
    public <T> T[] toArray(T[] a) {
        return this.blockingQueue.toArray(a);
    }

    @Override
    public boolean containsAll(Collection<?> c) {
        return this.blockingQueue.containsAll(c);
    }

    @Override
    public boolean addAll(Collection<? extends E> c) {
        return this.blockingQueue.addAll(c);
    }

    @Override
    public boolean removeAll(Collection<?> c) {
        return this.blockingQueue.removeAll(c);
    }

    @Override
    public boolean retainAll(Collection<?> c) {
        return this.blockingQueue.retainAll(c);
    }

    @Override
    public void clear() {
        this.blockingQueue.clear();
    }

    @Override
    public boolean add(E e) {
        return this.blockingQueue.add(e);
    }

    @Override
    public void put(E e) throws InterruptedException {
        this.blockingQueue.put(e);
    }

    @Override
    public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
        return this.blockingQueue.offer(e, timeout, unit);
    }

    @Override
    public E take() throws InterruptedException {
        return this.blockingQueue.take();
    }

    @Override
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        return this.blockingQueue.poll(timeout, unit);
    }

    @Override
    public int remainingCapacity() {
        return this.blockingQueue.remainingCapacity();
    }

    @Override
    public boolean remove(Object o) {
        return this.blockingQueue.remove(o);
    }

    @Override
    public boolean contains(Object o) {
        return this.blockingQueue.contains(o);
    }

    @Override
    public int drainTo(Collection<? super E> c) {
        return this.blockingQueue.drainTo(c);
    }

    @Override
    public int drainTo(Collection<? super E> c, int maxElements) {
        return this.blockingQueue.drainTo(c, maxElements);
    }
}

RejectedExecutionHandlerImpl

package com.gunjan;

import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;

public class RejectedExecutionHandlerImpl implements RejectedExecutionHandler {

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        boolean inserted = ((CustomBlockingQueue) executor.getQueue()).customOffer(r);
        if (!inserted) {
            throw new RejectedExecutionException();
        }
    }
}

CustomThreadPoolExecutorTest

package com.gunjan;

import java.util.concurrent.*;

public class CustomThreadPoolExecutorTest {

public static void main(String[] args) throws InterruptedException {
    LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue<Runnable>(500);
    CustomBlockingQueue customLinkedBlockingQueue = new ThreadPoolBlockingQueue<Runnable>(linkedBlockingQueue);
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 100, 60, TimeUnit.SECONDS,
            customLinkedBlockingQueue, new RejectedExecutionHandlerImpl());


    for (int i = 0; i < 750; i++) {
        try {
            threadPoolExecutor.submit(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(1000);
                        System.out.println(threadPoolExecutor);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        } catch (RejectedExecutionException e) {
            e.printStackTrace();
        }

    }

    threadPoolExecutor.shutdown();
    threadPoolExecutor.awaitTermination(Integer.MAX_VALUE, TimeUnit.MINUTES);
    System.out.println(threadPoolExecutor);
}
}

Upvotes: 0

William Morrison
William Morrison

Reputation: 11006

No way to get this exact behavior with a ThreadPoolExecutor.

But, here's a couple solutions:

  1. Consider,

    • If less than corePoolSize threads are running, a new thread will be created for every item queued until coorPoolSize threads are running.

    • A new thread will only be created if the queue is full, and less than maximumPoolSize threads are running.

    So, wrap a ThreadPoolExecutor in a class which monitors how fast items are being queued. Then, change the core pool size to a higher value when many items are being submitted. This will cause a new thread to be created each time a new item is submitted.

    When the submission burst is done, core pool size needs to be manually reduced again so the threads can naturally time out. If you're worried the busy burst could end abruptly, causing the manual method to fail, be sure to use allowCoreThreadTimeout.

  2. Create a fixed thread pool, and allowCoreThreadTimeout

    Unfortunately this uses more threads during low submission bursts, and stores no idle threads during zero traffic.

Use the 1st solution if you have the time, need, and inclination as it will handle a wider range of submission frequency and so is a better solution in terms of flexibility.

Otherwise use the 2nd solution.

Upvotes: 6

Jason C
Jason C

Reputation: 40396

It seems that your preference is minimal latency during times of low-activity. For that I would just set the corePoolSize to the max and let the extra threads hang around. During high-activity times these threads will be there anyways. During low-activity times their existence won't have that much impact. You can set the core thread timeout if you want them to die though.

That way all the threads will always be available to execute a task as soon as possible.

Upvotes: 1

Brian
Brian

Reputation: 17319

Just do what Executors.newFixedThreadPool does and set core and max to the same value. Here's the newFixedThreadPool source from Java 6:

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

What you can do if you have an existing one:

ThreadPoolExecutor tpe = ... ;
tpe.setCorePoolSize(tpe.getMaxPoolSize());

Edit: As William points out in the comments, this means that all threads are core threads, so none of the threads will time out and terminate. To change this behavior, just use ThreadPoolExecutor.allowCoreThreadTimeout(true). This will make it so that the threads can time out and be swept away when the executor isn't in use.

Upvotes: 2

Related Questions