HKTonyLee
HKTonyLee

Reputation: 3300

Phaser Synchronization Usage

General Question

It is well known that Phaser can be used to synchronize the start time of all tasks as mentioned in the JavaDocs and this blog by Niklas Schlimm.

Niklas has drawn a pretty understandable image of the synchronization:

         |Phaser   |Phaser   |Phaser   |  
Task 1   | ------> | ------> | ------> | ...
Task 2   | ------> | ------> | ------> | ...
...

Now supposed there is a hierarchy of tasks:

         |Phaser   |Phaser   |Phaser   |Phaser   |Phaser   |Phaser   |Phaser   |   ...
Master   |         |         | ------> |         |         | ------> |         |   ...
Task 1.1 | ----------------> |         | ----------------> |         | ----------> ...
Task 1.2 | ----------------> |         | ----------------> |         | ----------> ...
...      |         |         |         |         |         |         |         |   ...
Task 2.1 | ------> |         |         | ------> |         |         | ------> |   ...
Task 2.2 | ------> |         |         | ------> |         |         | ------> |   ...
...      |         |         |         |         |         |         |         |   ...
Task 3.1 |         | ------> |         |         | ------> |         |         |   ...
Task 3.2 |         | ------> |         |         | ------> |         |         |   ...
...      |         |         |         |         |         |         |         |   ...

So the tree of dependencies is like this:

                      Master
           /-----------/  \-----------\
           |                        Task 2 
         Task 1                       |
           |                        Task 3
           \-----------\  /-----------/
                      Master'

In a general case, there is a tree of dependencies to solve (let's say in a game pipeline, some are AI / game logic / render tasks). Luckily there is a "big" synchronization point and the tree is fixed (but not the number of parties). It is trivial to solve with several phasers. But is it possible to use only one phaser?

One Special Case

Specifically, I made a program to solve the following problem.

         |phasers[0]|phasers[1]|phasers[2]|phasers[0]|phasers[1]|phasers[2]| ...
Task 1   | -------> |          |          | -------> |          |          | ...
Task 2   | -------> |          |          | -------> |          |          | ...
Task 3   |          |          | -------> |          |          | -------> | ...
Task 4   |          | -------> |          |          | -------> |          | ...

Code here:

public class VolatileTester {

    private int a = 0, b = 0;       // change to volatile here
    private int c = 0;

    private final int TEST_COUNT = 100_000;
    private int[] testResult = new int[TEST_COUNT];

    private static void printResult(int[] result) {
        final Map<Integer, Integer> countMap = new HashMap<>();
        for (final int n : result) {
            countMap.put(n, countMap.getOrDefault(n, 0) + 1);
        }

        countMap.forEach((n, count) -> {
            System.out.format("%d -> %d%n", n, count);
        });
    }

    private void runTask1() {
        a = 5;
        b = 10;
    }

    private void runTask2() {
        if (b == 10) {
            if (a == 5) {
                c = 1;
            } else {
                c = 2;
            }
        } else {
            if (a == 5) {
                c = 3;
            } else {
                c = 4;
            }
        }
    }

    private void runTask3() {
        // "reset task"
        a = 0;
        b = 0;
        c = 0;
    }

    private static class PhaserRunner implements Runnable {
        private final Phaser loopStartPhaser;
        private final Phaser loopEndPhaser;
        private final Runnable runnable;

        public PhaserRunner(Phaser loopStartPhaser, Phaser loopEndPhaser, Runnable runnable) {
            this.loopStartPhaser = loopStartPhaser;
            this.loopEndPhaser = loopEndPhaser;
            this.runnable = runnable;
        }

        @Override
        public void run() {
            while (loopStartPhaser.arriveAndAwaitAdvance() >= 0) {
                runnable.run();
                loopEndPhaser.arrive();
            }
        }
    }

    void runTest() throws InterruptedException {
        final Phaser[] phasers = new Phaser[]{new Phaser(3), new Phaser(3), new Phaser(2)};

        final Thread[] threads = new Thread[]{
                // build tree of dependencies here
                new Thread(new PhaserRunner(phasers[0], phasers[1], this::runTask1)),
                new Thread(new PhaserRunner(phasers[0], phasers[1], this::runTask2)),
                new Thread(new PhaserRunner(phasers[2], phasers[0], this::runTask3))
        };

        try {
            for (Thread thread : threads) {
                thread.start();
            }

            phasers[0].arrive();        // phaser of last round

            for (int i = 0; i < TEST_COUNT; i++) {
                phasers[1].arriveAndAwaitAdvance();

                // Task4 here
                testResult[i] = c;

                phasers[2].arrive();
            }
        } finally {
            for (Phaser phaser : phasers) {
                phaser.forceTermination();
            }
        }

        for (Thread thread : threads) {
            thread.join();
        }

        printResult(testResult);
    }
}

You can see that multiple Phasers are used. Is it better to keep multiple phasers (like above), or just use one big phaser? Or any other synchronization methods in Java recommended?

Upvotes: 2

Views: 1363

Answers (2)

Tiago Cogumbreiro
Tiago Cogumbreiro

Reputation: 549

All tasks in your application proceed in a step wise manner, which means a single phaser would suffice. The requirement is tasks can skip phases cyclically, e.g., for every three phases, the given task should run two phases and then skips one phase (is idle for one phase). In summary,

  • A task should execute arriveAndAwaitAdvance() before each step of work.
  • A task should just invoke arriveAndAwaitAdvance() to skip a phase.

To this end, each task can use a boolean array, in the example called enabled, that specifies if it is enabled at a given phase number. By using modular algebra (enabled[phase % enabled.length]) we can define cyclic patterns. For instance, to specify that a task should run one out of three ticks, we declare enabled as new boolean[]{true, false, false}.

Keep in mind that in tasks must advance the phase regardless if they perform any actual work.

I fixed your example accordingly:

import java.util.concurrent.*;
import java.util.*;

public class VolatileTester {

    private int a = 0, b = 0;       // change to volatile here
    private int c = 0;

    private final int TEST_COUNT = 100;
    private int[] testResult = new int[TEST_COUNT];

    private static void printResult(int[] result) {
        final Map<Integer, Integer> countMap = new HashMap<>();
        for (final int n : result) {
            countMap.put(n, countMap.getOrDefault(n, 0) + 1);
        }

        countMap.forEach((n, count) -> {
            System.out.format("%d -> %d%n", n, count);
        });
    }

    private void runTask1() {
        a = 5;
        b = 10;
    }

    private void runTask2() {
        if (b == 10) {
            if (a == 5) {
                c = 1;
            } else {
                c = 2;
            }
        } else {
            if (a == 5) {
                c = 3;
            } else {
                c = 4;
            }
        }
    }

    private void runTask3() {
        // "reset task"
        a = 0;
        b = 0;
        c = 0;
    }

    private static class PhaserRunner implements Runnable {
        private final Phaser phaser;
        private final Runnable runnable;
        private boolean[] enabled;

        public PhaserRunner(Phaser phaser, boolean[] enabled, Runnable runnable) {
            this.phaser = phaser;
            this.runnable = runnable;
            this.enabled = enabled;
        }

        @Override
        public void run() {
            int phase;
            for (;;) {
                phase = phaser.arriveAndAwaitAdvance();
                if (phase < 0) {
                    break;
                } else if (enabled[phase % enabled.length]) {
                    System.out.println("I'm running: " + Thread.currentThread());
                    runnable.run();
                }
            }
        }
    }

    public void runTest() throws InterruptedException {
        final Phaser phaser = new Phaser(4);

        final Thread[] threads = new Thread[]{
                // build tree of dependencies here
                new Thread(new PhaserRunner(phaser, new boolean[]{true, false, false}, this::runTask1), "T1"),
                new Thread(new PhaserRunner(phaser, new boolean[]{false, false, true}, this::runTask2), "T2"),
                new Thread(new PhaserRunner(phaser, new boolean[]{false, true, false}, this::runTask3), "T3")
        };

        try {
            for (Thread thread : threads) {
                thread.start();
            }

            for (int i = 0; i < TEST_COUNT; i++) {
                testResult[i] = c;
                phaser.arriveAndAwaitAdvance();
            }
        } finally {
            phaser.forceTermination();
        }

        for (Thread thread : threads) {
            thread.join();
        }

        printResult(testResult);
    }

    public static void main(String[]args) throws Exception {
        new VolatileTester().runTest();
    }
}

Upvotes: 2

Kayaman
Kayaman

Reputation: 73568

Yes, you can get by with a single Phaser. CyclicBarrier has the Runnable barrierAction that is run after each cycle. Phaser supports similar functionality with the overriding of onAdvance.

When the final party for a given phase arrives, an optional action is performed and the phase advances. These actions are performed by the party triggering a phase advance, and are arranged by overriding method onAdvance(int, int), which also controls termination. Overriding this method is similar to, but more flexible than, providing a barrier action to a CyclicBarrier.

So essentially

Phaser phaser = new Phaser() {
    protected boolean onAdvance(int phase, int parties) {
        // Signal Master thread to perform its task and wait for it to finish
    }
};

Upvotes: 0

Related Questions