Reputation: 704
My purpose is very simple.
I want to use CyclicBarrier
and it's reset()
method to run 3 threads 4 times using the below mentioned code. Have researched all possible resources in net , in Concurrency in practice and in Thinking in Java. Could not get it resolved the way I want.
In Thinking in Java there is one solution of this kind under HorseRace.Java but that have used Executor service. I want to do it hard way using CyclicBarrier
and reset()
method only. Here is my code along with output, which runs till end but throws BrokenBarrierException
after reset() method.
package com.apal.barrier;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierEx1 {
CyclicBarrier cb;
public static int count = 0;
public static void main(String[] args) {
new CyclicBarrierEx1().manageThread();
}
private void manageThread() {
cb = new CyclicBarrier(3, new Runnable() {
@Override
public void run() {
if (count == 3) {
System.out.println("Exit from system");
return;
}
System.out.println("Collating task");
cb.reset();
for (int i = 0; i < 3; i++) {
new Thread(new Worker(cb)).start();
}
count++;
}
});
for (int i = 0; i < 3; i++) {
new Thread(new Worker(cb)).start();
}
}
}
class Worker implements Runnable {
CyclicBarrier cb;
public Worker(CyclicBarrier cb) {
this.cb = cb;
}
@Override
public void run() {
doSomeWork();
try {
cb.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
private void doSomeWork() {
System.out.println("Doing some work ");
}
}
Sample output
Doing some work
Doing some work
Doing some work
Collating task
Doing some work
Doing some work
Doing some work
Collating task
Doing some work
java.util.concurrent.BrokenBarrierExceptionDoing some work
Doing some work
Collating task
Doing some work
at java.util.concurrent.CyclicBarrier.dowait(Unknown Source)
at java.util.concurrent.CyclicBarrier.await(Unknown Source)
at com.apal.barrier.Worker.run(CyclicBarrierEx1.java:48)
at java.lang.Thread.run(Unknown Source)
java.util.concurrent.BrokenBarrierException
at java.util.concurrent.CyclicBarrier.dowait(Unknown Source)
at java.util.concurrent.CyclicBarrier.await(Unknown Source)
Doing some work at com.apal.barrier.Worker.run(CyclicBarrierEx1.java:48)
at java.lang.Thread.run(Unknown Source)
Doing some work
Exit from systemjava.util.concurrent.BrokenBarrierException
at java.util.concurrent.CyclicBarrier.dowait(Unknown Source)
at java.util.concurrent.CyclicBarrier.await(Unknown Source)
at com.apal.barrier.Worker.run(CyclicBarrierEx1.java:48)
at java.lang.Thread.run(Unknown Source)
java.util.concurrent.BrokenBarrierException
at java.util.concurrent.CyclicBarrier.dowait(Unknown Source)
at java.util.concurrent.CyclicBarrier.await(Unknown Source)
at com.apal.barrier.Worker.run(CyclicBarrierEx1.java:48)
at java.lang.Thread.run(Unknown Source)
java.util.concurrent.BrokenBarrierException
at java.util.concurrent.CyclicBarrier.dowait(Unknown Source)
at java.util.concurrent.CyclicBarrier.await(Unknown Source)
at com.apal.barrier.Worker.run(CyclicBarrierEx1.java:48)
at java.lang.Thread.run(Unknown Source)
java.util.concurrent.BrokenBarrierException
at java.util.concurrent.CyclicBarrier.dowait(Unknown Source)
at java.util.concurrent.CyclicBarrier.await(Unknown Source)
at com.apal.barrier.Worker.run(CyclicBarrierEx1.java:48)
at java.lang.Thread.run(Unknown Source)
Upvotes: 4
Views: 3866
Reputation: 704
Taken the idea from MichaelBurr's hint to check no of waiting thread count in Worker thread to reset the CyclicBarrier synchronizer. I am posting my own answer because I really wanted to use CyclicBarrier and reset() method to achieve multiple times running threads and collate their tasks, like Matrix manipulation.
package com.apal.barrier;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierEx1 {
CyclicBarrier cb;
public static int count = 0;
public static void main(String[] args) {
new CyclicBarrierEx1().manageThread();
}
private void manageThread() {
cb = new CyclicBarrier(3, new Runnable() {
@Override
public void run() {
if (count == 3) {
System.out.println("Exit from system");
return;
}
System.out.println("Collating task");
count++;
// cb.reset(); **Commented and replaced in Worker**
for (int i = 0; i < 3; i++) {
new Thread(new Worker(cb)).start();
}
}
});
for (int i = 0; i < 3; i++) {
new Thread(new Worker(cb)).start();
}
}
}
class Worker implements Runnable {
CyclicBarrier cb;
public Worker(CyclicBarrier cb) {
this.cb = cb;
}
@Override
public void run() {
doSomeWork();
try {
cb.await();
//if (cb.getNumberWaiting() == 0) // **if no one is waiting, then reset it.**
// cb.reset();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
private void doSomeWork() {
System.out.println("Doing some work ");
}
}
Sample output
Doing some work
Doing some work
Doing some work
Collating task
Doing some work
Doing some work
Doing some work
Collating task
Doing some work
Doing some work
Doing some work
Collating task
Doing some work
Doing some work
Doing some work
Exit from system
From the Javadoc for reset()
Resets the barrier to its initial state. If any parties are currently waiting at the barrier, they will return with a BrokenBarrierException. Note that resets after a breakage has occurred for other reasons can be complicated to carry out; threads need to re-synchronize in some other way, and choose one to perform the reset. It may be preferable to instead create a new barrier for subsequent use.
So reset causes any currently waiting threads to throw a BrokenBarrierException and wake immediately. reset is used when you want to "break" the barrier.
One should never need to use reset() in normal circumstances.
Upvotes: 0
Reputation: 340218
You have a race condition when the barrier action that is passed in the CyclicBarrier
constructor is run. The docs for CyclicBarrier.await()
say this about how that action method is run (emphasis added):
If the current thread is the last thread to arrive, and a non-null barrier action was supplied in the constructor, then the current thread runs the action before allowing the other threads to continue.
This means that the call the barrier action method makes to reset()
can occur while those other threads are still waiting on the barrier. That will result in the BrokenBarrierException
.
See the documentation for the paragraph that starts with "If the barrier action does not rely on the parties being suspended when it is executed, then any of the threads in the party could execute that action when it is released". Using that technique you can perform the work you're currently doing in the action routine inside exactly one of the worker threads just after it is released from the await()
. The following is an untested attempt (note - I also rejiggered where the count
variable is incremented to avoid a race condition where worker threads might complete before count
gets around to being incremented):
package com.apal.barrier;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierEx1 {
CyclicBarrier cb;
public static int count = 0;
public static void main(String[] args) {
new CyclicBarrierEx1().manageThread();
}
public static void barrierComplete(CyclicBarrier cb) {
System.out.println("Collating task");
if (count == 3) {
System.out.println("Exit from system");
return;
}
count++;
for (int i = 0; i < 3; i++) {
new Thread(new Worker(cb)).start();
}
}
private void manageThread() {
cb = new CyclicBarrier(3);
for (int i = 0; i < 3; i++) {
new Thread(new Worker(cb)).start();
}
}
}
class Worker implements Runnable {
CyclicBarrier cb;
public Worker(CyclicBarrier cb) {
this.cb = cb;
}
@Override
public void run() {
doSomeWork();
try {
if (cb.await() == 0) {
CyclicBarrierEx1.barrierComplete(cb);
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
private void doSomeWork() {
System.out.println("Doing some work ");
}
}
Upvotes: 2