swch
swch

Reputation: 1546

How to create chain/tier of Phasers

I'm writing multithread application which uses Phaser to know when to finish work. The problem is that in ExecutorCompletionService there can be even 100k of Threads in a queue, but maximum number of inarrived parties in Phaser is 65535. What can I do, when there arrives 65536 party?

My example code:

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;

public class Main {
public static void main(String[] args) throws Exception {
    ExecutorService ec = Executors.newFixedThreadPool(10);
    ExecutorCompletionService<List<String>> ecs = new ExecutorCompletionService<List<String>>(
            ec);
    Phaser phaser = new Phaser();

    // register first node/thread
    ecs.submit(new SimpleParser("startfile.txt"));
    phaser.register();

    Future<List<String>> future;
    do {
        future = ecs.poll();
        if(future!=null && future.get() != null) {
            addParties(phaser, future.get(), ecs);
            phaser.arriveAndDeregister();
        }

        if (phaser.isTerminated()) {
            ec.shutdown();
        }
    } while (!ec.isShutdown() && !phaser.isTerminated());
}

public static void addParties(Phaser p, List<String> filenames,
        ExecutorCompletionService<List<String>> ecs) {
    for (int i = 0; i < filenames.size(); i++) {
        ecs.submit(new SimpleParser(filenames.get(i)));
        //PROBLEM = What to do when Phaser has 65535+ unarrived parties
        p.register();
    }
}

static class SimpleParser implements Callable<List<String>> {

    String fileName;

    public SimpleParser(String fileName) {
        this.fileName = fileName;
    }

    @Override
    public List<String> call() throws Exception {
        return parseFile();
    }

    private List<String> parseFile() {
        return new ArrayList<String>(Arrays.asList(new String[] {
                "somefilename1.txt", "somefilename2.txt" }));
    }

}
}

The problem is in addParties() method. Single thread(SimpleParser) can return i.e. 100 new file names and there will be 100 new Threads submitted to ExecutorCompletionService, and 100 new parties registered in Phaser. I have tried to use something like this:

if(p.getUnarrivedParties() == 65535)
            p = new Phaser(p);

and create a chain of phasers, but it didn't help, because p.getUnarrivedParties() returns 0, but I can't register next party to it...

    System.out.println(p.getUnarrivedParties());
        if(p.getUnarrivedParties() == 65535) {
            p = new Phaser(p);
            System.out.println(p.getUnarrivedParties());
        }
        p.register();

prints:

65535

0

and throws IllegalStateException

So how can I create the new Phaser which would be connected with this old one?

//edit

Thank you @bowmore. I have just two more questions.

Let's look at the example:

import java.util.concurrent.Phaser;

public class Test2 {
    public static void main(String[] args) {
        Phaser parent = new Phaser();
        Phaser child1 = new Phaser(parent);
        Phaser child2 = new Phaser(parent);
        child1.register();
        child2.register();

        System.out.println("Parent: "+parent.isTerminated());
        System.out.println("Child1: "+child1.isTerminated());
        System.out.println("Child2: "+child1.isTerminated()+"\n");

        child1.arriveAndDeregister();
        System.out.println("Parent: "+parent.isTerminated());
        System.out.println("Child1: "+child1.isTerminated());
        System.out.println("Child2: "+child2.isTerminated()+"\n");

        child2.arriveAndDeregister();
        System.out.println("Parent: "+parent.isTerminated());
        System.out.println("Child1: "+child1.isTerminated());
        System.out.println("Child2: "+child2.isTerminated()+"\n");
    }
}

It prints:

Parent: false
Child1: false
Child2: false

Parent: false
Child1: false
Child2: false

Parent: true
Child1: true
Child2: true

Why after child1.arriveAndDeregister(); child1 is not terminated and how to check if it actually is?

Second question. I asked about creating new Phaser after reaching 65535 parties, because I thought it's useless to create thousands of new Objects - do you think there won't be memory problems with that, or maybe it can even impreove the performance?

Upvotes: 3

Views: 1430

Answers (1)

bowmore
bowmore

Reputation: 11308

Instead of registering with the existing Phaser new processes can register on a newly created child Phaser of the original. Creating a child Phaser is done simply by supplying the parent Phaser to the constructor of the child.

public static void addParties(Phaser p, List<String> filenames,
                              ExecutorCompletionService<List<String>> ecs) {
    Phaser newPhaser = new Phaser(p);
    for (int i = 0; i < filenames.size(); i++) {
        ecs.submit(new SimpleParser(filenames.get(i)));
        newPhaser.register();
    }
}

If you want to create child Phasers only when a certain threshold is reached, you can check the number of registered parties, rtaher than the number of unarrived ones :

public static void addParties(Phaser p, List<String> filenames, ExecutorCompletionService<List<String>> ecs) {
    Phaser toRegister = p.getRegisteredParties() > THRESHOLD ? new Phaser(p) : p;
    for (int i = 0; i < filenames.size(); i++) {
        ecs.submit(new SimpleParser(filenames.get(i)));
        //PROBLEM = What to do when Phaser has 65535+ unarrived parties
        toRegister.register();
    }
    System.out.println(p.getRegisteredParties());
}

EDIT :

To follow up question 1 : Child Phasers share their termination state with the root Phaser, here's the implementation of isTerminated()

public boolean isTerminated() {
    return root.state < 0L;
}

To follow up question 2 : parent Phasers do not actually keep references to their child Phasers. Once a child phaser is no longer referenced, it becomes eligible for garbage collection. You'd simply best follow the advice that's in the javadoc :

The best value of TASKS_PER_PHASER depends mainly on expected synchronization rates. A value as low as four may be appropriate for extremely small per-phase task bodies (thus high rates), or up to hundreds for extremely large ones.

The main reason for tiering is reducing heavy synchronization contention, so if you have light weight tasks, fewer tasks per phaser is better. It never hurts to profile different settings to tweak these things.

Upvotes: 3

Related Questions