Reputation: 3300
It is well known that Phaser can be used to synchronize the start time of all tasks as mentioned in the JavaDocs and this blog by Niklas Schlimm.
Niklas has drawn a pretty understandable image of the synchronization:
|Phaser |Phaser |Phaser |
Task 1 | ------> | ------> | ------> | ...
Task 2 | ------> | ------> | ------> | ...
...
Now supposed there is a hierarchy of tasks:
|Phaser |Phaser |Phaser |Phaser |Phaser |Phaser |Phaser | ...
Master | | | ------> | | | ------> | | ...
Task 1.1 | ----------------> | | ----------------> | | ----------> ...
Task 1.2 | ----------------> | | ----------------> | | ----------> ...
... | | | | | | | | ...
Task 2.1 | ------> | | | ------> | | | ------> | ...
Task 2.2 | ------> | | | ------> | | | ------> | ...
... | | | | | | | | ...
Task 3.1 | | ------> | | | ------> | | | ...
Task 3.2 | | ------> | | | ------> | | | ...
... | | | | | | | | ...
So the tree of dependencies is like this:
Master
/-----------/ \-----------\
| Task 2
Task 1 |
| Task 3
\-----------\ /-----------/
Master'
In a general case, there is a tree of dependencies to solve (let's say in a game pipeline, some are AI / game logic / render tasks). Luckily there is a "big" synchronization point and the tree is fixed (but not the number of parties). It is trivial to solve with several phasers. But is it possible to use only one phaser?
Specifically, I made a program to solve the following problem.
|phasers[0]|phasers[1]|phasers[2]|phasers[0]|phasers[1]|phasers[2]| ...
Task 1 | -------> | | | -------> | | | ...
Task 2 | -------> | | | -------> | | | ...
Task 3 | | | -------> | | | -------> | ...
Task 4 | | -------> | | | -------> | | ...
Code here:
public class VolatileTester {
private int a = 0, b = 0; // change to volatile here
private int c = 0;
private final int TEST_COUNT = 100_000;
private int[] testResult = new int[TEST_COUNT];
private static void printResult(int[] result) {
final Map<Integer, Integer> countMap = new HashMap<>();
for (final int n : result) {
countMap.put(n, countMap.getOrDefault(n, 0) + 1);
}
countMap.forEach((n, count) -> {
System.out.format("%d -> %d%n", n, count);
});
}
private void runTask1() {
a = 5;
b = 10;
}
private void runTask2() {
if (b == 10) {
if (a == 5) {
c = 1;
} else {
c = 2;
}
} else {
if (a == 5) {
c = 3;
} else {
c = 4;
}
}
}
private void runTask3() {
// "reset task"
a = 0;
b = 0;
c = 0;
}
private static class PhaserRunner implements Runnable {
private final Phaser loopStartPhaser;
private final Phaser loopEndPhaser;
private final Runnable runnable;
public PhaserRunner(Phaser loopStartPhaser, Phaser loopEndPhaser, Runnable runnable) {
this.loopStartPhaser = loopStartPhaser;
this.loopEndPhaser = loopEndPhaser;
this.runnable = runnable;
}
@Override
public void run() {
while (loopStartPhaser.arriveAndAwaitAdvance() >= 0) {
runnable.run();
loopEndPhaser.arrive();
}
}
}
void runTest() throws InterruptedException {
final Phaser[] phasers = new Phaser[]{new Phaser(3), new Phaser(3), new Phaser(2)};
final Thread[] threads = new Thread[]{
// build tree of dependencies here
new Thread(new PhaserRunner(phasers[0], phasers[1], this::runTask1)),
new Thread(new PhaserRunner(phasers[0], phasers[1], this::runTask2)),
new Thread(new PhaserRunner(phasers[2], phasers[0], this::runTask3))
};
try {
for (Thread thread : threads) {
thread.start();
}
phasers[0].arrive(); // phaser of last round
for (int i = 0; i < TEST_COUNT; i++) {
phasers[1].arriveAndAwaitAdvance();
// Task4 here
testResult[i] = c;
phasers[2].arrive();
}
} finally {
for (Phaser phaser : phasers) {
phaser.forceTermination();
}
}
for (Thread thread : threads) {
thread.join();
}
printResult(testResult);
}
}
You can see that multiple Phaser
s are used. Is it better to keep multiple phasers (like above), or just use one big phaser? Or any other synchronization methods in Java recommended?
Upvotes: 2
Views: 1363
Reputation: 549
All tasks in your application proceed in a step wise manner, which means a single phaser would suffice. The requirement is tasks can skip phases cyclically, e.g., for every three phases, the given task should run two phases and then skips one phase (is idle for one phase). In summary,
arriveAndAwaitAdvance()
before each step of work.arriveAndAwaitAdvance()
to skip a phase.To this end, each task can use a boolean array, in the example called enabled
, that specifies if it is enabled at a given phase number.
By using modular algebra (enabled[phase % enabled.length]
) we can define cyclic patterns. For instance, to specify that a task should run one out of three ticks, we declare enabled
as new boolean[]{true, false, false}
.
Keep in mind that in tasks must advance the phase regardless if they perform any actual work.
I fixed your example accordingly:
import java.util.concurrent.*;
import java.util.*;
public class VolatileTester {
private int a = 0, b = 0; // change to volatile here
private int c = 0;
private final int TEST_COUNT = 100;
private int[] testResult = new int[TEST_COUNT];
private static void printResult(int[] result) {
final Map<Integer, Integer> countMap = new HashMap<>();
for (final int n : result) {
countMap.put(n, countMap.getOrDefault(n, 0) + 1);
}
countMap.forEach((n, count) -> {
System.out.format("%d -> %d%n", n, count);
});
}
private void runTask1() {
a = 5;
b = 10;
}
private void runTask2() {
if (b == 10) {
if (a == 5) {
c = 1;
} else {
c = 2;
}
} else {
if (a == 5) {
c = 3;
} else {
c = 4;
}
}
}
private void runTask3() {
// "reset task"
a = 0;
b = 0;
c = 0;
}
private static class PhaserRunner implements Runnable {
private final Phaser phaser;
private final Runnable runnable;
private boolean[] enabled;
public PhaserRunner(Phaser phaser, boolean[] enabled, Runnable runnable) {
this.phaser = phaser;
this.runnable = runnable;
this.enabled = enabled;
}
@Override
public void run() {
int phase;
for (;;) {
phase = phaser.arriveAndAwaitAdvance();
if (phase < 0) {
break;
} else if (enabled[phase % enabled.length]) {
System.out.println("I'm running: " + Thread.currentThread());
runnable.run();
}
}
}
}
public void runTest() throws InterruptedException {
final Phaser phaser = new Phaser(4);
final Thread[] threads = new Thread[]{
// build tree of dependencies here
new Thread(new PhaserRunner(phaser, new boolean[]{true, false, false}, this::runTask1), "T1"),
new Thread(new PhaserRunner(phaser, new boolean[]{false, false, true}, this::runTask2), "T2"),
new Thread(new PhaserRunner(phaser, new boolean[]{false, true, false}, this::runTask3), "T3")
};
try {
for (Thread thread : threads) {
thread.start();
}
for (int i = 0; i < TEST_COUNT; i++) {
testResult[i] = c;
phaser.arriveAndAwaitAdvance();
}
} finally {
phaser.forceTermination();
}
for (Thread thread : threads) {
thread.join();
}
printResult(testResult);
}
public static void main(String[]args) throws Exception {
new VolatileTester().runTest();
}
}
Upvotes: 2
Reputation: 73568
Yes, you can get by with a single Phaser
. CyclicBarrier
has the Runnable barrierAction
that is run after each cycle. Phaser
supports similar functionality with the overriding of onAdvance
.
When the final party for a given phase arrives, an optional action is performed and the phase advances. These actions are performed by the party triggering a phase advance, and are arranged by overriding method onAdvance(int, int), which also controls termination. Overriding this method is similar to, but more flexible than, providing a barrier action to a CyclicBarrier.
So essentially
Phaser phaser = new Phaser() {
protected boolean onAdvance(int phase, int parties) {
// Signal Master thread to perform its task and wait for it to finish
}
};
Upvotes: 0