overexchange
overexchange

Reputation: 1

Executor framework - Producer Consumer pattern

It is mentioned by Java_author in section 5.3.1,

... many producer-consumer designs can be expressed using the Executor task execution framework, which itself uses the producer-consumer pattern.

... The producer-consumer pattern offers a thread-friendly means of decomposing the problem into simpler components(if possible).


Does Executor framework implementation internally follow producer-consumer pattern?

If yes, How the idea of producer-consumer pattern helps in implementation of Executor framework?

Upvotes: 1

Views: 1488

Answers (2)

Ravindra babu
Ravindra babu

Reputation: 38910

Check implementation of ThreadPoolExecutor

public void execute(Runnable command) {
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

Now check

private boolean addWorker(Runnable firstTask, boolean core) {
     // After some checks, it creates Worker and start the thread
    Worker w = new Worker(firstTask);
    Thread t = w.thread;

   // After some checks, thread has been started
   t.start();
}

Implementation of Worker:

  /**
     * Class Worker mainly maintains interrupt control state for
     * threads running tasks, along with other minor bookkeeping.
     * This class opportunistically extends AbstractQueuedSynchronizer
     * to simplify acquiring and releasing a lock surrounding each
     * task execution.  This protects against interrupts that are
     * intended to wake up a worker thread waiting for a task from
     * instead interrupting a task being run.  We implement a simple
     * non-reentrant mutual exclusion lock rather than use ReentrantLock
     * because we do not want worker tasks to be able to reacquire the
     * lock when they invoke pool control methods like setCorePoolSize.
     */
    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {

      /** Delegates main run loop to outer runWorker  */
       public void run() {
            runWorker(this);
       }

    final void runWorker(Worker w) {
          Runnable task = w.firstTask;
          w.firstTask = null;
          boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            clearInterruptsForTaskRun();
            try {
                beforeExecute(w.thread, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }

Which Runnable to execute is dependent on below logic.

/**
 * Performs blocking or timed wait for a task, depending on
 * current configuration settings, or returns null if this worker
 * must exit because of any of:
 * 1. There are more than maximumPoolSize workers (due to
 *    a call to setMaximumPoolSize).
 * 2. The pool is stopped.
 * 3. The pool is shutdown and the queue is empty.
 * 4. This worker timed out waiting for a task, and timed-out
 *    workers are subject to termination (that is,
 *    {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
 *    both before and after the timed wait.
 *
 * @return task, or null if the worker must exit, in which case
 *         workerCount is decremented
 */
private Runnable getTask() {
     // After some checks, below code returns Runnable

      try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
}

In Summary:

  1. Producer adds Runnable or Callable in execute API with workQueue.offer(command)

  2. execute() method creates Worker thread if needed

  3. This Worker thread runs in infinite loop. It gets task (e.g. Runnable) from getTask()

  4. getTask() pools on BlockingQueue<Runnable> workQueue) and take Runnable. It is consumer of BlockingQueue.

Does Executor framework implementation internally follow producer-consumer pattern?

Yes as explained above.

If yes, How the idea of producer-consumer pattern helps in implementation of Executor framework?

BlockingQueue implementation like ArrayBlockingQueue and ExecutorService implementationThreadPoolExecutor are thread safe. Overhead on programmer on explicit implementation of synchronized, wait and notify calls to implement the same has been reduced.

Upvotes: 1

Somnath Musib
Somnath Musib

Reputation: 3714

Executor framework uses producer-consumer pattern.

From Wikipedia,

In computing, the producer–consumer problem (also known as the bounded-buffer problem) is a classic example of a multi-process synchronization problem. The problem describes two processes, the producer and the consumer, who share a common, fixed-size buffer used as a queue. The producer's job is to generate data, put it into the buffer, and start again. At the same time, the consumer is consuming the data (i.e., removing it from the buffer), one piece at a time. The problem is to make sure that the producer won't try to add data into the buffer if it's full and that the consumer won't try to remove data from an empty buffer.

If we have a look on different ExecutorService framework implementations, more specifically ThreadPoolExecutor class, it basically has the following:

  1. A queue, where the jobs are submitted and held
  2. Number of threads which consumes the tasks submitted to the queue.

Based on the type of the executor service, these parameters changes

For example,

  • Fixed thread pool uses a LinkedBlockingQueue and user configured no of threads
  • Cached Thread pool uses a SynchronousQueue and no of threads between 0 to Integer.MAX_VALUE based on the number of submitted tasks

Upvotes: 1

Related Questions