arpitpanwar
arpitpanwar

Reputation: 309

Interrupting BlockingQueue take() in a thread pool

I have a thread pool which creates workers and the workers take the jobs from a BlockingQueue. The threads wait on take() from the queue. Even on explicitly calling the thread interrupt method for the running threads, they are still waiting on take(). What is right way of dealing with blockingqueue

public class ThreadPoolGen {
    static final Logger LOG = Logger.getLogger(ThreadPoolGen.class);

    private LinkedBlockingQueue<Runnable> queue;
    private int threadCount;
    private Worker[] workers;
    private Thread[] workerThreads;

    public ThreadPoolGen(int count) throws CountException{

        if(isValidCount(count))
            this.threadCount = count;
        else
            throw new CountException("Invalid Thread Count");

        workers = new Worker[count];
        workerThreads = new Thread[count];
        queue = new LinkedBlockingQueue<Runnable>();
        startThreads();
    }

    public boolean execute(Runnable task){
        return queue.offer(task);
    }

    private void startThreads(){
        synchronized (this) {

            for(int i=0;i<threadCount;i++){

                workers[i] = new Worker();
                workerThreads[i] = new Thread(workers[i]);
                workerThreads[i].start();
            }
        }
    }

    public boolean shutDown(){
        try{
            for(Worker w: workers){

                w.thread.interrupt();

            }

            queue.clear();

            for(Thread workerThread : workerThreads){

                workerThread.interrupt();

            }

            return true;

        }catch(Exception e){
            LOG.debug(Thread.currentThread()+": Worker Thread Shutdown Failed");
            return false;

        }

    }

    private boolean isValidCount(int count){

        if(count<Integer.MAX_VALUE && count>0)
            return true;
        else
            return false;
    }

    private class Worker implements Runnable{

        final Thread thread;

        private Worker(){
            this.thread = Thread.currentThread();
        }

        @Override
        public void run() {
            try{
                while(true){
                    try{
                        Runnable r = queue.take();
                        r.run();
                    }catch(InterruptedException interrupt){
                        LOG.debug("Interrupted exception in: "+thread.getName());
                    }
                }
            }catch(Exception intr){

                this.thread.interrupt();

            }finally{

                this.thread.interrupt();
            }
        }
    }
}

The calling class :

public class Runner {
    public static void main(String[] args) {
        try {
            System.out.println("BeforeLaunch");
            ThreadPoolGen gen = new ThreadPoolGen(10);
            gen.execute(new Runnable() {

                @Override
                public void run() {
                    System.out.println("Inside Runnable");

                }
            });

            gen.shutDown();
        } catch (CountException ce) {
        } catch (Exception e) {
        }

    }

}

Upvotes: 0

Views: 1656

Answers (2)

Gray
Gray

Reputation: 116908

I have a thread pool which creates workers and the workers take the jobs from a BlockingQueue. The threads wait on take() from the queue. Even on explicitly calling the thread interrupt method for the running threads, they are still waiting on take(). What is right way of dealing with BlockingQueue.

Seems to me that you are duplicating the behavior of an ExecutorService. Is there a reason for this? Here's the tutorial for them:

ExecutorService threadPool = Executors.newFixedThreadPool(count);
...
threadPool.submit(new Runnable() ...);

Sometimes there is context that needs to be held by the running threads but still it seems like your classes are overly complex. You can still use an ExecutorService that share a BlockingQueue between the producer and the consumer threads. You could interrupt the threads when done but you could also push count number of null objects into the queue and have your worker threads quit when they see the null.

public class Worker implements Runnable {
     // some sort of context needed to be held by each runner
     public void run() {
         while (true) {
             Work work = sharedQueue.take();
             if (work == null) {
                 return;
             }
             // do the work ...
         }
     }
}

Upvotes: 1

Sotirios Delimanolis
Sotirios Delimanolis

Reputation: 280102

You're catching the exception within the while loop

while (true) {
    try {
        Runnable r = queue.take();
        r.run();
    } catch (InterruptedException interrupt) {
        LOG.debug("Interrupted exception in: " + thread.getName());
    }
}

Any time you interrupt this thread, it will simply loop again. Get rid of this try-catch. Let the outer one (outside the while) handle the InterruptedException.

Note that you might get the interrupt while your thread is executing run(), in which case an InterruptedException might not do what you expect it to. You should possibly set a flag so that the same thread doesn't loop again once the Runnable#run() is done.

Upvotes: 1

Related Questions