Reputation: 1546
I'm writing multithread application which uses Phaser to know when to finish work. The problem is that in ExecutorCompletionService there can be even 100k of Threads in a queue, but maximum number of inarrived parties in Phaser is 65535. What can I do, when there arrives 65536 party?
My example code:
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
public class Main {
public static void main(String[] args) throws Exception {
ExecutorService ec = Executors.newFixedThreadPool(10);
ExecutorCompletionService<List<String>> ecs = new ExecutorCompletionService<List<String>>(
ec);
Phaser phaser = new Phaser();
// register first node/thread
ecs.submit(new SimpleParser("startfile.txt"));
phaser.register();
Future<List<String>> future;
do {
future = ecs.poll();
if(future!=null && future.get() != null) {
addParties(phaser, future.get(), ecs);
phaser.arriveAndDeregister();
}
if (phaser.isTerminated()) {
ec.shutdown();
}
} while (!ec.isShutdown() && !phaser.isTerminated());
}
public static void addParties(Phaser p, List<String> filenames,
ExecutorCompletionService<List<String>> ecs) {
for (int i = 0; i < filenames.size(); i++) {
ecs.submit(new SimpleParser(filenames.get(i)));
//PROBLEM = What to do when Phaser has 65535+ unarrived parties
p.register();
}
}
static class SimpleParser implements Callable<List<String>> {
String fileName;
public SimpleParser(String fileName) {
this.fileName = fileName;
}
@Override
public List<String> call() throws Exception {
return parseFile();
}
private List<String> parseFile() {
return new ArrayList<String>(Arrays.asList(new String[] {
"somefilename1.txt", "somefilename2.txt" }));
}
}
}
The problem is in addParties() method. Single thread(SimpleParser) can return i.e. 100 new file names and there will be 100 new Threads submitted to ExecutorCompletionService, and 100 new parties registered in Phaser. I have tried to use something like this:
if(p.getUnarrivedParties() == 65535)
p = new Phaser(p);
and create a chain of phasers, but it didn't help, because p.getUnarrivedParties() returns 0, but I can't register next party to it...
System.out.println(p.getUnarrivedParties());
if(p.getUnarrivedParties() == 65535) {
p = new Phaser(p);
System.out.println(p.getUnarrivedParties());
}
p.register();
prints:
65535
0
and throws IllegalStateException
So how can I create the new Phaser which would be connected with this old one?
//edit
Thank you @bowmore. I have just two more questions.
Let's look at the example:
import java.util.concurrent.Phaser;
public class Test2 {
public static void main(String[] args) {
Phaser parent = new Phaser();
Phaser child1 = new Phaser(parent);
Phaser child2 = new Phaser(parent);
child1.register();
child2.register();
System.out.println("Parent: "+parent.isTerminated());
System.out.println("Child1: "+child1.isTerminated());
System.out.println("Child2: "+child1.isTerminated()+"\n");
child1.arriveAndDeregister();
System.out.println("Parent: "+parent.isTerminated());
System.out.println("Child1: "+child1.isTerminated());
System.out.println("Child2: "+child2.isTerminated()+"\n");
child2.arriveAndDeregister();
System.out.println("Parent: "+parent.isTerminated());
System.out.println("Child1: "+child1.isTerminated());
System.out.println("Child2: "+child2.isTerminated()+"\n");
}
}
It prints:
Parent: false
Child1: false
Child2: false
Parent: false
Child1: false
Child2: false
Parent: true
Child1: true
Child2: true
Why after child1.arriveAndDeregister(); child1 is not terminated and how to check if it actually is?
Second question. I asked about creating new Phaser after reaching 65535 parties, because I thought it's useless to create thousands of new Objects - do you think there won't be memory problems with that, or maybe it can even impreove the performance?
Upvotes: 3
Views: 1430
Reputation: 11308
Instead of registering with the existing Phaser
new processes can register on a newly created child Phaser
of the original. Creating a child Phaser
is done simply by supplying the parent Phaser
to the constructor of the child.
public static void addParties(Phaser p, List<String> filenames,
ExecutorCompletionService<List<String>> ecs) {
Phaser newPhaser = new Phaser(p);
for (int i = 0; i < filenames.size(); i++) {
ecs.submit(new SimpleParser(filenames.get(i)));
newPhaser.register();
}
}
If you want to create child Phasers only when a certain threshold is reached, you can check the number of registered parties, rtaher than the number of unarrived ones :
public static void addParties(Phaser p, List<String> filenames, ExecutorCompletionService<List<String>> ecs) {
Phaser toRegister = p.getRegisteredParties() > THRESHOLD ? new Phaser(p) : p;
for (int i = 0; i < filenames.size(); i++) {
ecs.submit(new SimpleParser(filenames.get(i)));
//PROBLEM = What to do when Phaser has 65535+ unarrived parties
toRegister.register();
}
System.out.println(p.getRegisteredParties());
}
EDIT :
To follow up question 1 : Child Phaser
s share their termination state with the root Phaser
, here's the implementation of isTerminated()
public boolean isTerminated() {
return root.state < 0L;
}
To follow up question 2 : parent Phasers do not actually keep references to their child Phasers. Once a child phaser is no longer referenced, it becomes eligible for garbage collection. You'd simply best follow the advice that's in the javadoc :
The best value of TASKS_PER_PHASER depends mainly on expected synchronization rates. A value as low as four may be appropriate for extremely small per-phase task bodies (thus high rates), or up to hundreds for extremely large ones.
The main reason for tiering is reducing heavy synchronization contention, so if you have light weight tasks, fewer tasks per phaser is better. It never hurts to profile different settings to tweak these things.
Upvotes: 3