Reputation: 3204
I have the following code for starting a threaded server:
Thread server = new Thread(new ServerRunnable(serverPort, devMode, messageQueue, database));
server.start();
Thread worker1 = new Thread(
new WorkerRunnable(
messageQueue,
database,
devMode,
1
));
Thread worker2 = new Thread(
new WorkerRunnable(
messageQueue,
database,
devMode,
2
));
Thread worker3 = new Thread(
new WorkerRunnable(
messageQueue,
database,
devMode,
3
));
worker1.start();
worker2.start();
worker3.start();
The ServerRunnable
passes a State object containing bytes read and other information into the messageQueue
.
The WorkerRunnable
threads take messages and process them.
I was looking at ThreadPoolExecutor
, hoping I could use it to replace the three worker threads above with a pool that could grow or shrink as required, but it doesn't work the way I expected. It wants a queue of tasks to complete.
My code is using Java NIO, so there is no guarantee that an item placed into the queue is complete, and so may require further processing. As a result I can't use ThreadPoolExecutor
in the way I first imagined, which would be passing the Runnable
to containing the queue to it.
So, it strikes me that if I want to use ThreadPoolExecutor
here I have to add it in the ServerRunnable
class (kind of invert my current process), and pass a new WorkerRunnable
class onto the ThreadPoolExecutor
queue after passing a message parameter to it.
Is that correct?
Something like this maybe:
LinkedBlockingQueue messageQueue = new LinkedBlockingQueue<Runnable>();
Thread server = new Thread(
new ServerRunnable(
serverPort,
devMode,
messageQueue,
database
));
server.start();
int corePoolSize = 5;
int maxPoolSize = 10;
long keepAliveTime = 5000;
ExecutorService threadPoolExecutor =
new ThreadPoolExecutor(
corePoolSize,
maxPoolSize,
keepAliveTime,
TimeUnit.MILLISECONDS,
messageQueue);
threadPoolExecutor.prestartAllCoreThreads()
Then in the ServerRunnable
class:
// Process incoming bytes into a message
messageQueue.put(new WorkerRunnable(database, devMode, message));
Is my understanding correct?
Upvotes: 1
Views: 672
Reputation: 41
Yes, either that or you can have single consumer to your queue which will keep passing the task to ExecutorService. Something like this
Thread consumer = new Thread(new Consumer(queue));
consumer.start();
class Consumer implements Runnable {
private ExecutorService service = Executors.newCachedThreadPool();
private final BlockingQueue queue;
public Consumer(Queue queue) {
this.queue = queue;
}
@Override
public void run() {
Task t = null;
while(t = queue.take()) {
Worker worker = new Worker(t);
service.execute(worker);
}
}
}
This is a bit loosely coupled with ExecutorService.
Upvotes: 1