Reputation: 297
So, I'm pretty new to multi-threading and have been using this idea in all my programs lately. Before I start using it more I really want to make sure it is a correct efficient way to implement multi-threading using the Executor,CompletionService and a BlockingQueue plus an Observer. I'll provide example code below but let me first quickly explain how I think it works and maybe that will help.
The first thing I have is a BlockingQueue all tasks are added to this queue via an add(Task task) method. Upon creation of the class the run method is called with a while(true) calling take on the queue blocking until something gets added to the task queue.
Once something gets added to the queue inside the run() queue.take() returns the item on queue. Then I take that item and pass it to WorkerThread class that does stuff on it. That workerThread is added to the CompletionService pool which handles the waiting for a thread to finish.
Ok now comes the part i'm not sure is correct. I also have an inner class that implements runnable and is started when the class is initialized. Its job is to loop forever calling pool.take(). So, this essentially waits for one of the WorkerThreads to complete. I let the completion service handle this. Once the take() gets a value the inner class passes it to a notify observer method.
Is this okay implementation.? It concerns me a bit that there is the main classes run with a while(true) looping on task queue and an inner class also looping waiting on pool to receive a result from WorkerThread?
Here is an example implementation. What you think?
public class HttpSchedulerThreaded extends Observable implements Runnable {
private ArrayList<Object> list;//holds [0]=VULNINFO, [1]=REQUESTBUILDER OBJECT
protected static Logger logger = Logger.getLogger(HttpScheduler.class.getName());
private CompletionService<VulnInfo> pool;
private ExecutorService executor ;
private Thread responseWorkerThread;
private HttpSchedulerWorker schedulerWorker;
private boolean shouldRun = true;
private CountDownLatch doneSignal;
private String[] vulnClassesIgnoreRedirect;
private boolean followRedirects;
private boolean runJavascriptInResponse;
private boolean isSSL;
private int numThreadsInPool;
private BlockingQueue<VulnInfo> queue;
private boolean isRunning ;
public HttpSchedulerThreaded(int numThreads)
{
numThreadsInPool = numThreads;
executor = Executors.newFixedThreadPool(numThreads);
doneSignal = new CountDownLatch(numThreads);
pool = new ExecutorCompletionService<VulnInfo>(executor);
schedulerWorker = new HttpSchedulerWorker();
responseWorkerThread = new Thread(schedulerWorker);
queue = new LinkedBlockingQueue<VulnInfo>();
}
public HttpSchedulerThreaded()
{
numThreadsInPool = 1;
executor = Executors.newFixedThreadPool(1);
doneSignal = new CountDownLatch(1);
pool = new ExecutorCompletionService<VulnInfo>(executor);
schedulerWorker = new HttpSchedulerWorker();
responseWorkerThread = new Thread(schedulerWorker);
queue = new LinkedBlockingQueue<VulnInfo>();
}
public void setThreadCount(int numThreads)
{
if(!isRunning){
executor = Executors.newFixedThreadPool(numThreads);
doneSignal = new CountDownLatch(numThreads);
pool = new ExecutorCompletionService<VulnInfo>(executor);
numThreadsInPool = numThreads;
}
}
public void start()
{
if(!isRunning){
responseWorkerThread.start();
new Thread(this).start();
isRunning = true;
}
}
public void add(VulnInfo info) {
queue.add(info);
}
@Override
public void run() {
// TODO Auto-generated method stub
while(shouldRun)
{
try {
VulnInfo info = queue.take();
Callable<VulnInfo> worker = new HttpSchedulerRequestSender(info,followRedirects,runJavascriptInResponse,vulnClassesIgnoreRedirect,doneSignal);
//System.out.println("submitting to pooler: " + info.getID());
pool.submit(worker);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
/**
* Inner class of proxy is a worker thread blocks until the pool has transactions complete as soon as they
* are complete it will send them to server for completion.
* @author Steve
*
*/
class HttpSchedulerWorker implements Runnable{
public void run() {
// TODO Auto-generated method stub
while(true)
{
VulnInfo vulnInfo = null;
try {
//System.out.println("taking finished request");
Future<VulnInfo> tmp = pool.take();
// Future<VulnInfo> tmp = pool.poll();
if(tmp != null)
vulnInfo = tmp.get();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
if(vulnInfo != null)
{
//System.out.println("updating all observers: " + vulnInfo.getID());
updateObservers(vulnInfo);
}
}
}
}
Upvotes: 1
Views: 2987
Reputation: 3222
From my experience, your solution seems to be okay. I have three comments/suggestions:
responseWorkerThread = new Thread(schedulerWorker)
and responseWorkerThread.start()
, you've essentially broken apart those two loops. This part looks okay. You do seem to be using the Executor
s API correctly, but it does look like you may need some more code for stopping the HttpScheduledWorker
thread and for shutting down the ExecutionCompletionService
as part of the HttpSchedulerThreaded
class.queue
is really necessary. ExecutionCompletionService
already uses a BlockingQueue
to manage the tasks which are submitted to it.Upvotes: 2