Reputation: 9829
I have a very elementary threadpool code. It invokes a pool of worker objects housed within a linkedblockingqueue. The code just prints out the input data by re-cycling the worker objects.
I find consistent deadlock/freeze with the following:
public class throttleheapthreadpool{
private quoteworkerobject[] channels;
private LinkedBlockingQueue<quoteworkerobject> idlechannels;
public throttleheapthreadpool(int poolsize,int stocks){
channels=new quoteworkerobject[poolsize];
idlechannels=new LinkedBlockingQueue<quoteworkerobject>();
for(int i=1;i<poolsize;i++){
channels[i]=new quoteworkerobject(idlechannels);
idlechannels.add(channels[i]);//All WORKERS to Idle pool to start
}
}
public void execute(Integer quote){
quoteworkerobject current = null;
try {
//extract worker from pool
current = (quoteworkerobject)idlechannels.take();
current.put(quote);
} catch (InterruptedException e) {
}
}
class quoteworkerobject{
LinkedBlockingQueue<Integer> taskqueue=new LinkedBlockingQueue<Integer>();
Thread quotethread=null;
LinkedBlockingQueue<quoteworkerobject> idle=null;
@SuppressWarnings("unchecked")
public quoteworkerobject(LinkedBlockingQueue<quoteworkerobject> idlechannels){
this.idle=idlechannels;
Runnable r=new Runnable(){
public void run() {
insertquote();
}
};
quotethread=new Thread(r);
quotethread.start();//spawn a thread from the worker
}
public void put(Integer quote){
taskqueue.add(quote);
}
public void insertquote(){
try{
Integer thisquote=taskqueue.take();
idle.add(this);
}
catch(Exception ex){
}
}
}
public static void main(String[] args){
throttleheapthreadpool pool=new throttleheapthreadpool(5,200);
Random randomGenerator = new Random();
for(int node=0;node < 20;node++){
int d=randomGenerator.nextInt(5*200);
pool.execute(d);
}
}
}
This code consistently freezes on the 8th execution - at the point current = (quoteworkerobject)idlechannels.take();
What is wrong in the above?
Upvotes: 1
Views: 1881
Reputation: 120998
This is exactly why I (hate?) do not like to use such code. You should consider to make your/our lives easier and to write code that even after a few months you could look at and be prove of it: name you variables accordingly, write short documentation or explanation,etc. It took me 25 min to refactor because I could not understand what is going on.
I added a small refactoring and I also added a few breakpoints, look at the code - the explanation is inside. But the problem is in the insertQuote method - it finishes too early.
import java.util.Random;
import java.util.concurrent.LinkedBlockingQueue;
public class Pool {
private Worker[] workers;
private LinkedBlockingQueue<Worker> workerQueue;
/**
* Create a pool of 5 workers and a {@link LinkedBlockingQueue} to store them
*/
public Pool(int poolsize) {
//1. First you get here : you create a Pool of 5 Worker Threads and a Queue to store them
System.out.println("1.");
workers = new Worker[poolsize];
workerQueue = new LinkedBlockingQueue<Worker>();
for (int i = 0; i < poolsize; i++) {
//2. You instantiate 5 worker Threads and place each of them on the Queue
System.out.println("2.");
workers[i] = new Worker(workerQueue);
workerQueue.add(workers[i]);
}
}
public void execute(Integer quote) {
Worker current = null;
try {
// extract worker from pool
//6. Get a worker from the Queue
System.out.println("6.");
current = workerQueue.take();
current.put(quote);
} catch (InterruptedException e) {
}
}
/**
*
*
*/
class Worker {
LinkedBlockingQueue<Integer> taskqueueForEachWorker = new LinkedBlockingQueue<Integer>();
LinkedBlockingQueue<Worker> workerQueue = null;
public Worker(LinkedBlockingQueue<Worker> idlechannels) {
new Thread(new Runnable() {
@Override
public void run() {
//3. You call the insert quote method
System.out.println("3.");
insertquote();
}
}).start();
}
public void put(Integer quote) {
//7. Add a task for each Thread to do
System.out.println("7.");
taskqueueForEachWorker.add(quote);
}
//TODO The problem is here: After you execute this line : workerQueue.add(this); this method ends, NO MORE worker Threads are put on the queue,
// thus at point 6 you block, well because there are no more worker Threads an no one add them.
public void insertquote() {
try {
// 4. You try to take an Integer from the Pool of tasks from rach Thread, but there is nothing yet - it is empty, thus each Thread (worker)
// blocks here, waiting for a task
System.out.println("4.");
Integer thisquote = taskqueueForEachWorker.take(); // This will successed only after 7.
workerQueue.add(this);
} catch (Exception ex) {
}
}
}
public static void main(String[] args) {
Pool pool = new Pool(5);
Random randomGenerator = new Random();
for (int node = 0; node < 20; node++) {
int d = randomGenerator.nextInt(5 * 200);
System.out.println("5.");
pool.execute(d);
}
}
}
The output is going to be 1. 2. 3. 4. 2. 3. 4. 2. 3. 4. 2. 3. 4. 2. 3. 4. 5. 6. 7. 5. 6. 7. 5. 6. 7. 5. 6. 7. 5. 6. 7. 5. 6.
See that the last line is 6. If blocks here because the method insertQuote has exited and thus the queue is now empty, all the worker Threads have been taken.
Also it seems to me that because your worker Threads are each using a separated queue you should implement the "work stealing" pattern, or Deque. Look into that also.
Upvotes: 3
Reputation: 328795
See below my refactoring (still not perfect but slightly more readable). Your problem is as follows:
throttleheapthreadpool
)insertquote
once in a separate thread and goes back to the idle poolSo overall, you submit 4 jobs which get done and the workers go back to the queue, you then give them an additional 4 jobs (that's 8 in total), except that they don't consume the job because their insertquote
method has exited.
Solution: run insertquote
in a while loop:
public void insertquote() {
try {
while (true) {
taskqueue.take();
idle.add(this);
}
} catch (Exception ex) {
}
}
For information, here is my current version of your code:
public class ThrottleHeapThreadPool {
private final BlockingQueue<QuoteWorkerObject> idlechannels = new LinkedBlockingQueue<QuoteWorkerObject>();
public static void main(String[] args) {
ThrottleHeapThreadPool pool = new ThrottleHeapThreadPool(5, 200);
Random randomGenerator = new Random();
for (int node = 0; node < 20; node++) {
int d = randomGenerator.nextInt(5 * 200);
pool.execute(d);
}
}
public ThrottleHeapThreadPool(int poolsize, int stocks) {
for (int i = 1; i < poolsize; i++) {
QuoteWorkerObject worker = new QuoteWorkerObject(idlechannels);
idlechannels.add(worker);//All WORKERS to Idle pool to start
worker.init();
}
}
public void execute(Integer quote) {
try {
//extract worker from pool
QuoteWorkerObject worker = idlechannels.take();
worker.put(quote);
} catch (InterruptedException e) {
}
}
class QuoteWorkerObject {
private final BlockingQueue<Integer> taskqueue = new LinkedBlockingQueue<Integer>();
private final BlockingQueue<QuoteWorkerObject> idle;
@SuppressWarnings("unchecked")
public QuoteWorkerObject(BlockingQueue<QuoteWorkerObject> idlechannels) {
this.idle = idlechannels;
}
public void init() {
new Thread(new Runnable() {
public void run() {
insertquote();
}
}).start();
}
public void put(Integer quote) {
taskqueue.add(quote);
}
public void insertquote() {
try {
while (true) {
taskqueue.take();
idle.add(this);
}
} catch (Exception ex) {
}
}
}
}
Upvotes: 1