de_xtr
de_xtr

Reputation: 902

ThreadPoolExecutor not executing all tasks

I have a ThreadPoolExecutor -corePoolSize=5, maxPoolSize=10 queueSize = 10,keepAlive=1000 seconds. I am executing 100 Runnable tasks. The number of tasks actually getting executed is varying and not all are executed. There is nothing reported by the RejectionHandler as well. I believe my understanding of ThreadPoolExecutor is wrong somehow. Can anybody help me? How can I get all the tasks executed?

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class TestThreadPoolExecutor {
    public static void main(String[] args) {
     ThreadFactory threadFactory =  Executors.defaultThreadFactory();
    ArrayBlockingQueue<Runnable> arrayBlockingQueue = new ArrayBlockingQueue<Runnable>(10);

    ThreadPoolExecutor threadPoolExecutor =  new ThreadPoolExecutor(5, 10,1000, TimeUnit.SECONDS, arrayBlockingQueue, threadFactory, new RejectedExecutionHandlerImpl());
    MonitorThread monitor = new MonitorThread(threadPoolExecutor, 3);
    Thread monitorThread = new Thread(monitor);
    monitorThread.start();
    for (int i = 0; i < 100; i++) {
        threadPoolExecutor.execute(new DummyRunnableTask(i));
    }
    //threadPoolExecutor.shutdown();
    //monitor.shutDown();
}
}

class DummyRunnableTask implements Runnable {

  private int i;

    public DummyRunnableTask(int i) {
    super();
    this.i = i;
  }

    @Override
    public void run() {
    /*try {
        Thread.sleep(1);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }*/
    System.out.println("Thread Name:=" + Thread.currentThread().getName()+ " is working for id=" + i);
}

}

class RejectedExecutionHandlerImpl implements RejectedExecutionHandler {

    @Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
    System.out.println();
}

}

class MonitorThread implements Runnable{
    private ThreadPoolExecutor executor;
    private int seconds;
    private Boolean run = true;
    public MonitorThread(ThreadPoolExecutor executor, int seconds) {
    super();
    this.executor = executor;
    this.seconds = seconds;
  }

    public void shutDown() {
    this.run = false;
  }

    @Override
    public void run() {
    while (run) {
        System.out.println(
                String.format("[monitor] [%d/%d] Active: %d, Completed: %d, Task: %d, isShutdown: %s, isTerminated: %s",
                    this.executor.getPoolSize(),
                    this.executor.getCorePoolSize(),
                    this.executor.getActiveCount(),
                    this.executor.getCompletedTaskCount(),
                    this.executor.getTaskCount(),
                    this.executor.isShutdown(),
                    this.executor.isTerminated()));
            try {
                Thread.sleep(seconds*1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    }

  }

 }

Sample Output:

...............
Thread Name:=pool-1-thread-7 is working for id=97
Thread Name:=pool-1-thread-2 is working for id=95
Thread Name:=pool-1-thread-10 is working for id=94
Thread Name:=pool-1-thread-4 is working for id=93
[monitor] [0/5] Active: 0, Completed: 0, Task: 1, isShutdown: false, isTerminated: false
[monitor] [10/5] Active: 0, Completed: 88, Task: 88, isShutdown: false, isTerminated: false
[monitor] [10/5] Active: 0, Completed: 88, Task: 88, isShutdown: false, isTerminated: false

Upvotes: 8

Views: 13508

Answers (4)

de_xtr
de_xtr

Reputation: 902

Thanks to @Mandy8055, @Ritesh, @Speise

I found two ways, in which this issue can be handled gracefully, if the queueSize can't be predicted.

  1. Write a rejection handler that submits the task back to the queue. But then again, as @Ritesh pointed out, producer may be faster than the consumer and we may end up getting a StackOverFlow. Anyway, here is what I did:

    class RejectedExecutionHandlerImpl implements RejectedExecutionHandler {
    
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
    DummyRunnableTask task =  (DummyRunnableTask)r;
    
    System.out.println("Task id "+task.getI()+" got rejected, resubmitting");
    try {
        Thread.sleep(100);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    executor.execute(r);
    }
    }
    

    Now, this is not exactly a good way to handle it, as you just don't know, if the task is going to take 100ms to finish or not. May be we can create a list of rejected tasks instead and resubmit them all back again.

  2. The other way is to block the submit/execute method the ThreadPoolExecutor, once a certain limit(maxPoolSize+queueSize) is reached and make it wait till there is space. We do that by using Bounded Semaphore in a custom implementation of ThreadPoolExecutor

    class CustomExecutor { 
    
    private final ThreadPoolExecutor executor;
    private final Semaphore semaphore;
    
    public CustomExecutor(int corePoolSize, int maxPoolSize, int keepAliveTime, TimeUnit unit,int queueSize, ThreadFactory factory) {
    BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(queueSize);
    this.executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, unit, queue, factory);
    this.semaphore = new Semaphore(queueSize + maxPoolSize);
    }
    public void shutDown(){
    this.executor.shutdown();
    }
    
    public ThreadPoolExecutor getExecutor() {
    return executor;
    }
    
    
    private void exec(final Runnable command) throws InterruptedException {
    semaphore.acquire();
    try {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    command.run();
                } finally {
                    semaphore.release();
                }
            }
        });
    } catch (RejectedExecutionException e) {
        semaphore.release();
        throw e;
    }
    }
    
    public void execute (Runnable command) throws InterruptedException {
    exec(command);
      }
    }
    

Let us know if you can think of any other way.

Upvotes: 2

Ritesh
Ritesh

Reputation: 1847

Parameters of TestThreadPoolExecutor :

corePoolSize - the number of threads to keep in the pool, even if they are idle.

maximumPoolSize - the maximum number of threads to allow in the pool.

keepAliveTime - when the number of threads is greater than the core, this is the maximum time that excess idle threads will wait for new tasks before terminating.

unit - the time unit for the keepAliveTime argument.

workQueue - the queue to use for holding tasks before they are executed. This queue will hold only the Runnable tasks submitted by the execute method.

handler - the handler to use when execution is blocked because the thread bounds and queue capacities are reached.

Your Scenario: corePoolSize = 5, maximumPoolSize = 10, workqueue = 10

And you are pushing 100 tasks in for loop.

First of all you can never get more than 10 worker threads.

Suppose all your worker threads are busy, then the most workqueue can hold is 10. After that, any more submission of task will be rejected and you can trap that rejection in rejection handler. So increasing the work queue will help in solving issue. (But not recommended)

What you can try:

Use some time gap in pushing your work, instead of using a for loop, which is virtually trying to push work at same instance.

Set your maximumPoolSize to number of core X 2 (or 4).

When your work is rejected, you can try resubmitting work after some delay

PS: This is producer-consumer problem. It always require the consumer to be faster than producer. For some moments, producer becomes faster and we see spike in work load. To handle that load spike, queues are designed, but even when queue is overflown, rejection is sent back to producer, and its the duty of producer to handle that gracefully, either by resubmitting or by dropping.

Upvotes: 1

user7571182
user7571182

Reputation:

I appreciate the answer provided by @Speise but just to add some lucid and perspicuous details; I would quote the statement mentioned in the oracle docs:

If fewer than corePoolSize threads are running, the Executor always prefers adding a new thread rather than queuing.

If corePoolSize or more threads are running, the Executor always prefers queuing a request rather than adding a new thread.

If a request cannot be queued, a new thread is created unless this would exceed maximumPoolSize, in which case, the task will be rejected.

The docs are very lucid and states that in order to avoid rejection of tasks you should increase your maxPoolSize. rather than increasing the Queue size because there may not be the cases(as mentioned by you in the comments) when you know beforehand that how much task are going to be submitted(so you can adjust the maxPoolSize). I would also recommend you to use UnboundedQueues(LinkedBlockingQueue) in this type scenario(However; there may be one quick fix for this situation also you can increase time of pushing the requests to the queue).

Also increasing the Queue size hampers the performance so why not leave it(the creation of threads accordingly) to the JVM.(As the ThreadPool takes action according to the maxPoolSize)

Upvotes: 4

Speise
Speise

Reputation: 809

Suppose that 100 thread cannot be processed as maxPoolSize=10 and queueSize = 10 which means you can put inside your pull executor in worst case only 20 Threads. The best case can changed depending on performance and complexity of job inside each Thread. Try to Increase your queueSize to 90. So For sure 90 of them will wait and other 10 will be kept to work. Best explenation you can find here link:

If a request cannot be queued, a new thread is created unless this would exceed maximumPoolSize, in which case, the task will be rejected.

Upvotes: 1

Related Questions