user10916892
user10916892

Reputation: 997

Custom thread pool implementation in java

For leaning purpose i am trying to implement my own thread pool in java. Below is what i have implemented. I have few questions about this implementation:

  1. Although i am using BlockingQueue like built in java Executors expect us to provide Runnable objects (through execute method). But in my case i feel like i can create any object instead of Runnable. So then why does Java executors expect Runnable, i tried looking into the source code but could not figure it out yet.

  2. Is there anything else wrong with this primitive implementation ?

Please find the code.

public class CustomThreadPool {

private final BlockingQueue<Runnable> blockingQueue;
private final Worker[] workers;

public CustomThreadPool(final int numOfThreads) {
    blockingQueue = new LinkedBlockingQueue<>();
    workers = new Worker[numOfThreads];

    for (int i = 0; i < numOfThreads; i++) {
        workers[i] = new Worker();
        workers[i].start();
    }
}

public void execute(final Runnable task) {
    blockingQueue.add(task);
}

public void shutdownImmediately() {
    for (int i = 0; i < workers.length; i++) {
        workers[i].shutdownSignal = true;
        workers[i] = null;
    }
}

 private class Worker extends Thread {
    private Runnable taskToPerform = null;
    boolean shutdownSignal = false;

    @Override
    public void run() {
        while(true && !shutdownSignal) {
            taskToPerform = blockingQueue.poll();
            if (taskToPerform != null) {
                taskToPerform.run();
            }
            if(shutdownSignal) {
                break;
            }
        }
    }
}

public static void main(String[] args) throws Exception {
    final CustomThreadPool threadPool = new CustomThreadPool(5);
    for (int i = 0; i < 20; i++) {
        threadPool.execute(() -> System.out.println(Thread.currentThread().getName()));
    }
    Thread.sleep(1*1000);
    threadPool.shutdownImmediately();
}

}

Upvotes: 1

Views: 3530

Answers (1)

miskender
miskender

Reputation: 7948

  1. Executor expects Runnable or Callable because it will call run or call method of these interfaces when it is running the tasks you submitted.

  2. In Your implementation You don't use blocking aspects of BlockingQueue. Your thread pool threads will spin constantly(take cpu time) on Your while(true && !shutdownSignal) loop when there is no task exists in Your queue. Because poll() method is not blocking. And this not something You would want when implementing a thread pool.

You should use one of the blocking methods instead of poll().

You can use poll(long timeout,TimeUnit unit) method which takes time out parameter. In this case if You call Your shutdownImmediately method while any thread pool thread waiting on this call. They will wait for the time out duration and poll will return null to them They will see shutdownSignal is being set and get out of the loop. You can also call interrupt method If You don't want them to wait for the timeout.

 // in run method
 try {
     taskToPerform = blockingQueue.poll(5, TimeUnit.SECONDS);
 } catch (InterruptedException e) {
     break; // this thread is interrupted. Time to leave this loop
 }

// in shutdownImmediately method 
workers[i].shutdownSignal = true;
// If You don't call interrupt, Threads will wait at max 5 sec for this case.
workers[i].interrupt(); 
workers[i] = null;

Or You can use take() method which blocks when there is nothing on the queue. But in this case You have to interrupt the threads that might be waiting on the take method call when You call Your shutdownImmediately method. Otherwise they will stuck at the take() call.

// in run method
try {
   taskToPerform = blockingQueue.take();
} catch (InterruptedException e) {
   break; // this thread is interrupted. Time to leave this loop
}

// in shutdownImmediately method 
workers[i].shutdownSignal = true;
workers[i].interrupt(); // this is crucial for this case
workers[i] = null;

Upvotes: 3

Related Questions