Piero
Piero

Reputation: 9273

ExecutorService with Runnable share CyclicBarrier

I have a problem in a concurrent solution in Java using n Runnable class that share a CyclicBarrier, and the Runnable are handled by a ExecutorService, this is the code:

public class Worker implements Runnable {
   private CyclicBarrier writeBarrier;
   private int index;
   private int valuetocalculate;

   public Worker(int i,CyclicBarrier writeBarrier)
   {
      this.writeBarrier = writeBarrier;
      this.index = i;
      this.valuetocalculate = 0;
   }

   public void run() {

      //calculations with valuetocalculate
      writeBarrier.await();
      //write new valuetocalculate value
   }
}

public class Context {

    private ArrayList<Worker> workers;
    private Chief chief;

    public Context()
    {
       workers = new ArrayList<Worker>();
       chief = new Chief();
    }

    public void generateRandomWorkers(nworkers)
    {
       writeBarrier = newWriteBarrier(workers);
       chief.setBarrier(writeBarrier);
       //generate random woker
       for (int i = 0; i<nworkers;i++)
       {
           Worker worker = new Worker(i,writeBarrier);
           workers.add(worker);
       }
       chief.setWorkersArray(workers);
       chief.start();
     }
}

public class Chief extend Thread {

    private CyclicBarrier writeBarrier;
    private ArrayList<Worker> workers;
    private ExecutorService executor;
    private int cores;

    public Chief ()
    {
       cores = Runtime.getRuntime().availableProcessors()+1;
    }

    public void setBarrier (CyclicBarrier writeBarrier)
    {
       this.writeBarrier = writeBarrier;
    }

    public setWorkersArray(ArrayList<Worker> workers)
    {
       this.workers = workers;
    }

    public ArrayList<Integer> getvaluetocalculate()
    {
        ArrayList<Integer> values = new ArrayList<Integer> ();
        for (int i = 0; i<workers.size();i++)
        {
           values.add(workers.get(i).valuetocalculate);
        }

         return values;
    }

    public void run(){
       while (!stop) //always true for testing
       {
          getvaluetocalculate();
          //make calculations
          writeBarrier.reset();
          executor = Executors.newFixedThreadPool(cores);
          for (int i = 0;i<workers.size();i++)
          {
             Runnable runnable = workers.get(i);
             executor.execute(runnable);
           }
           executor.shutdown();
           while (!executor.isTerminated())
           {
           }
        }
     }
}

All start in the main with:

Context = new Context();
context.generateRandomWorkers();

The problem is that the Runnable doesn't go over the first "iteration" in the run of the Chief, so seems that the problem is that the Workers doesn't go over the writerBarrier.await();, instead if I initialized this:

executor = Executors.newFixedThreadPool(cores);

with the workers.size(), works but seems not synchronized...how I can solve?

Upvotes: 4

Views: 2338

Answers (1)

Mike Q
Mike Q

Reputation: 23229

OK so it looks like you are trying to do the following

  • One chief/scheduler which controls the workers
  • One or more workers doing calculations
  • Chief executes all workers
  • Chief waits for all workers to complete
  • Chief gets results from each Worker to calculate result

Assuming the above here are your problems.

  • Doing the barrier.await() in the worker run() method prevents that thread from being released back to the pool to run subsequent workers. Therefore the when pool size < worker size the first 'pool size' workers consume the threads and then stop waiting for the others which can't run. This is why not all your workers run unless you change the pool size to workers.size().
  • The valuetocalculate variable is not synchronised between the worker setting the result and the chief reading it so you might be seeing stale results.

The correct way to implement this sort of system is to have the workers implement Callable where the callable returns the result once the worker has calculated. This takes care of publishing the results back to your chief (you'll see below how this works).

Remove the cyclic barrier, you don't need that.

Create the executor as you are now and call invokeAll() with a list of Callables (your workers). This method invokes the Callables using the executor and waits for them to complete. It blocks until all workers have completed at which point it will return a List<Future<T>>. Each Future corresponds to one of the workers/Callables you passed in. Iterate the list pulling the results out. If a worker has failed trying to get() the result from it's Future will throw an exception.

Hope that helps.

Upvotes: 1

Related Questions