Reputation: 1
It is mentioned by Java_author in section 5.3.1,
... many producer-consumer designs can be expressed using the
Executor
task execution framework, which itself uses the producer-consumer pattern.... The producer-consumer pattern offers a thread-friendly means of decomposing the problem into simpler components(if possible).
Does Executor framework implementation internally follow producer-consumer pattern?
If yes, How the idea of producer-consumer pattern helps in implementation of Executor framework?
Upvotes: 1
Views: 1488
Reputation: 38910
Check implementation of ThreadPoolExecutor
public void execute(Runnable command) {
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
Now check
private boolean addWorker(Runnable firstTask, boolean core) {
// After some checks, it creates Worker and start the thread
Worker w = new Worker(firstTask);
Thread t = w.thread;
// After some checks, thread has been started
t.start();
}
Implementation of Worker
:
/**
* Class Worker mainly maintains interrupt control state for
* threads running tasks, along with other minor bookkeeping.
* This class opportunistically extends AbstractQueuedSynchronizer
* to simplify acquiring and releasing a lock surrounding each
* task execution. This protects against interrupts that are
* intended to wake up a worker thread waiting for a task from
* instead interrupting a task being run. We implement a simple
* non-reentrant mutual exclusion lock rather than use ReentrantLock
* because we do not want worker tasks to be able to reacquire the
* lock when they invoke pool control methods like setCorePoolSize.
*/
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Runnable task = w.firstTask;
w.firstTask = null;
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
clearInterruptsForTaskRun();
try {
beforeExecute(w.thread, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
Which Runnable
to execute is dependent on below logic.
/**
* Performs blocking or timed wait for a task, depending on
* current configuration settings, or returns null if this worker
* must exit because of any of:
* 1. There are more than maximumPoolSize workers (due to
* a call to setMaximumPoolSize).
* 2. The pool is stopped.
* 3. The pool is shutdown and the queue is empty.
* 4. This worker timed out waiting for a task, and timed-out
* workers are subject to termination (that is,
* {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
* both before and after the timed wait.
*
* @return task, or null if the worker must exit, in which case
* workerCount is decremented
*/
private Runnable getTask() {
// After some checks, below code returns Runnable
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
In Summary:
Producer adds Runnable
or Callable
in execute
API with workQueue.offer(command)
execute()
method creates Worker
thread if needed
This Worker
thread runs in infinite loop. It gets task (e.g. Runnable
) from getTask()
getTask()
pools on BlockingQueue<Runnable> workQueue)
and take Runnable
. It is consumer of BlockingQueue
.
Does Executor framework implementation internally follow producer-consumer pattern?
Yes as explained above.
If yes, How the idea of producer-consumer pattern helps in implementation of Executor framework?
BlockingQueue
implementation like ArrayBlockingQueue
and ExecutorService
implementationThreadPoolExecutor
are thread safe. Overhead on programmer on explicit implementation of synchronized, wait and notify calls to implement the same has been reduced.
Upvotes: 1
Reputation: 3714
Executor framework
uses producer-consumer
pattern.
From Wikipedia,
In computing, the producer–consumer problem (also known as the bounded-buffer problem) is a classic example of a multi-process synchronization problem. The problem describes two processes, the producer and the consumer, who share a common, fixed-size buffer used as a queue. The producer's job is to generate data, put it into the buffer, and start again. At the same time, the consumer is consuming the data (i.e., removing it from the buffer), one piece at a time. The problem is to make sure that the producer won't try to add data into the buffer if it's full and that the consumer won't try to remove data from an empty buffer.
If we have a look on different ExecutorService framework
implementations, more specifically ThreadPoolExecutor
class, it basically has the following:
Based on the type of the executor service, these parameters changes
For example,
LinkedBlockingQueue
and user configured no of threadsSynchronousQueue
and no of threads between 0
to Integer.MAX_VALUE
based on the number of submitted tasksUpvotes: 1