Reputation: 73
Looking at the javadocs for CyclicBarrier I found the following statement in the class documentation that I dont completely understand. From the javadoc:
If the barrier action does not rely on the parties being suspended when it is executed, then any of the threads in the party could execute that action when it is released. To facilitate this, each invocation of await() returns the arrival index of that thread at the barrier. You can then choose which thread should execute the barrier action, for example:
if (barrier.await() == 0) {
// log the completion of this iteration
}
Can someone explain how to designate a specific thread for execution of the barrier action once all the parties have called .await() and perhaps provide an example?
Upvotes: 7
Views: 2151
Reputation: 2991
OK, pretend RuPaul wanted some worker threads, but only the 3rd one that finished is supposed to do the barrier task (Say "Sashay, Chante").
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
public class Main
{
private static class Worker implements Runnable {
private CyclicBarrier barrier;
public Worker(CyclicBarrier b) {
barrier = b;
}
public void run() {
final String threadName = Thread.currentThread().getName();
System.out.printf("%s: You better work!%n", threadName);
// simulate the workin' it part
Random rnd = new Random();
int secondsToWorkIt = rnd.nextInt(10) + 1;
try {
TimeUnit.SECONDS.sleep(secondsToWorkIt);
} catch (InterruptedException ex) { /* ...*/ }
System.out.printf("%s worked it, girl!%n", threadName);
try {
int n = barrier.await();
final int myOrder = barrier.getParties() - n;
System.out.printf("Turn number: %s was %s%n", myOrder, threadName);
// MAGIC CODE HERE!!!
if (myOrder == 3) { // the third one that finished
System.out.printf("%s: Sashay Chante!%n", myOrder);
}
// END MAGIC CODE
}
catch (BrokenBarrierException ex) { /* ... */ }
catch (InterruptedException ex) { /* ... */ }
}
}
private final int numThreads = 5;
public void work() {
/*
* I want the 3rd thread that finished to say "Sashay Chante!"
* when everyone has called await.
* So I'm not going to put my "barrier action" in the CyclicBarrier constructor,
* where only the last thread will run it! I'm going to put it in the Runnable
* that calls await.
*/
CyclicBarrier b = new CyclicBarrier(numThreads);
for (int i= 0; i < numThreads; i++) {
Worker task = new Worker(b);
Thread thread = new Thread(task);
thread.start();
}
}
public static void main(String[] args)
{
Main main = new Main();
main.work();
}
}
Here is an example of the output:
Thread-0: You better work!
Thread-4: You better work!
Thread-2: You better work!
Thread-1: You better work!
Thread-3: You better work!
Thread-1 worked it, girl!
Thread-4 worked it, girl!
Thread-0 worked it, girl!
Thread-3 worked it, girl!
Thread-2 worked it, girl!
Turn number: 5 was Thread-2
Turn number: 3 was Thread-0
3: Sashay Chante!
Turn number: 1 was Thread-1
Turn number: 4 was Thread-3
Turn number: 2 was Thread-4
As you can see, the thread that finished 3rd was Thread-0, so Thread-0 was the one that did the "barrier action".
Say you are able to name your threads:
thread.setName("My Thread " + i);
Then you can perform the action on the thread of that name...I don't know how feasible that is for you.
Upvotes: 4
Reputation: 17648
CyclicBarrier enables designating a Thread by ORDER :
Designating a thread that returns at a SPECIFIC order is possible if, as you say, you enclose the barrier completion logic in a conditional which is specific to a thread index. Thus, your implementation above will work according to the documentation you cited.
However, the point of confusion here - is that the documentation is talking about thread identity in terms of order of returning to the barrier, rather than thread object identity. Thus, thread 0 refers to the 0th thread to complete.
Alternative : Designating a Thread using other mechanisms.
If you wanted to have a specific thread carry on a specific action after other works completed, you might use a different mechanism - like a semaphore , for example. If you desired this behavior, you may not really need the cyclic barrier.
To inspect what is meant by the documentation, run the class (modified from http://programmingexamples.wikidot.com/cyclicbarrier) below , where ive incorporated your snippet.
Example of what is meant by the docs for the CyclicBarrier
package thread; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierExample
{
private static int matrix[][] =
{
{ 1 },
{ 2, 2 },
{ 3, 3, 3 },
{ 4, 4, 4, 4 },
{ 5, 5, 5, 5, 5 } };
static final int rows = matrix.length;
private static int results[]=new int[rows];
static int threadId=0;
private static class Summer extends Thread
{
int row;
CyclicBarrier barrier;
Summer(CyclicBarrier barrier, int row)
{
this.barrier = barrier;
this.row = row;
}
public void run()
{
int columns = matrix[row].length;
int sum = 0;
for (int i = 0; i < columns; i++)
{
sum += matrix[row][i];
}
results[row] = sum;
System.out.println("Results for row " + row + " are : " + sum);
// wait for the others
// Try commenting the below block, and watch what happens.
try
{
int w = barrier.await();
if(w==0)
{
System.out.println("merging now !");
int fullSum = 0;
for (int i = 0; i < rows; i++)
{
fullSum += results[i];
}
System.out.println("Results are: " + fullSum);
}
}
catch(Exception e)
{
e.printStackTrace();
}
}
}
public static void main(String args[])
{
/*
* public CyclicBarrier(int parties,Runnable barrierAction)
* Creates a new CyclicBarrier that will trip when the given number
* of parties (threads) are waiting upon it, and which will execute
* the merger task when the barrier is tripped, performed
* by the last thread entering the barrier.
*/
CyclicBarrier barrier = new CyclicBarrier(rows );
for (int i = 0; i < rows; i++)
{
System.out.println("Creating summer " + i);
new Summer(barrier, i).start();
}
System.out.println("Waiting...");
}
}
Upvotes: 1
Reputation: 47183
I think that section of the documentation is about an alternative to the barrier action Runnable
, not a particular way of using it. Note how it says (emphasis mine):
If the barrier action does not rely on the parties being suspended when it is executed
If you specify a barrier action as a runnable, then it ...
is run once per barrier point, after the last thread in the party arrives, but before any threads are released
So, while the threads are suspended (although since it's run by the last thread to arrive, that one isn't suspendd; but at least its normal flow of execution is suspended until the barrier action finishes).
The business about using the return value of await()
is something you can do if you don't need your action to run while the threads are suspended.
The documentation's examples are indicative. The example using a Runnable
barrier action is coordinating the work of some other threads - merging the rows and checking if the job is done. The other threads need to wait for it to know if they have more work to do. So, it has to run while they're suspended. The example using the return value from await()
is some logging. The other threads don't depend on the logging having being done. So, it can happen while the other threads have started doing more work.
Upvotes: 4