Reputation: 309
I have a thread pool which creates workers and the workers take the jobs from a BlockingQueue
.
The threads wait on take()
from the queue.
Even on explicitly calling the thread interrupt method for the running threads, they are still waiting on take()
. What is right way of dealing with blockingqueue
public class ThreadPoolGen {
static final Logger LOG = Logger.getLogger(ThreadPoolGen.class);
private LinkedBlockingQueue<Runnable> queue;
private int threadCount;
private Worker[] workers;
private Thread[] workerThreads;
public ThreadPoolGen(int count) throws CountException{
if(isValidCount(count))
this.threadCount = count;
else
throw new CountException("Invalid Thread Count");
workers = new Worker[count];
workerThreads = new Thread[count];
queue = new LinkedBlockingQueue<Runnable>();
startThreads();
}
public boolean execute(Runnable task){
return queue.offer(task);
}
private void startThreads(){
synchronized (this) {
for(int i=0;i<threadCount;i++){
workers[i] = new Worker();
workerThreads[i] = new Thread(workers[i]);
workerThreads[i].start();
}
}
}
public boolean shutDown(){
try{
for(Worker w: workers){
w.thread.interrupt();
}
queue.clear();
for(Thread workerThread : workerThreads){
workerThread.interrupt();
}
return true;
}catch(Exception e){
LOG.debug(Thread.currentThread()+": Worker Thread Shutdown Failed");
return false;
}
}
private boolean isValidCount(int count){
if(count<Integer.MAX_VALUE && count>0)
return true;
else
return false;
}
private class Worker implements Runnable{
final Thread thread;
private Worker(){
this.thread = Thread.currentThread();
}
@Override
public void run() {
try{
while(true){
try{
Runnable r = queue.take();
r.run();
}catch(InterruptedException interrupt){
LOG.debug("Interrupted exception in: "+thread.getName());
}
}
}catch(Exception intr){
this.thread.interrupt();
}finally{
this.thread.interrupt();
}
}
}
}
The calling class :
public class Runner {
public static void main(String[] args) {
try {
System.out.println("BeforeLaunch");
ThreadPoolGen gen = new ThreadPoolGen(10);
gen.execute(new Runnable() {
@Override
public void run() {
System.out.println("Inside Runnable");
}
});
gen.shutDown();
} catch (CountException ce) {
} catch (Exception e) {
}
}
}
Upvotes: 0
Views: 1656
Reputation: 116908
I have a thread pool which creates workers and the workers take the jobs from a BlockingQueue. The threads wait on take() from the queue. Even on explicitly calling the thread interrupt method for the running threads, they are still waiting on take(). What is right way of dealing with
BlockingQueue
.
Seems to me that you are duplicating the behavior of an ExecutorService
. Is there a reason for this? Here's the tutorial for them:
ExecutorService threadPool = Executors.newFixedThreadPool(count);
...
threadPool.submit(new Runnable() ...);
Sometimes there is context that needs to be held by the running threads but still it seems like your classes are overly complex. You can still use an ExecutorService
that share a BlockingQueue
between the producer and the consumer threads. You could interrupt the threads when done but you could also push count
number of null
objects into the queue and have your worker threads quit when they see the null
.
public class Worker implements Runnable {
// some sort of context needed to be held by each runner
public void run() {
while (true) {
Work work = sharedQueue.take();
if (work == null) {
return;
}
// do the work ...
}
}
}
Upvotes: 1
Reputation: 280102
You're catching the exception within the while
loop
while (true) {
try {
Runnable r = queue.take();
r.run();
} catch (InterruptedException interrupt) {
LOG.debug("Interrupted exception in: " + thread.getName());
}
}
Any time you interrupt this thread, it will simply loop again. Get rid of this try-catch
. Let the outer one (outside the while
) handle the InterruptedException
.
Note that you might get the interrupt
while your thread is executing run()
, in which case an InterruptedException
might not do what you expect it to. You should possibly set a flag so that the same thread doesn't loop again once the Runnable#run()
is done.
Upvotes: 1