Reputation: 901
I'm playing with an idea I have for a dataflow language and I'm trying to use a an executor service to limit the number of available threads to the number of cores on my system and create a shared pool of threads.
ExecutorService service = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
Function a = new Function("A", service);
Function b = new Function("B", service);
Function c = new Function("C", service);
a.addObserver(b);
a.addObserver(c);
a.change()
A is changed causing it to call it's evaluate()
, then B and C are notified they need to update and they do.
In Function.java
I have:
public class Function implements Func{
private boolean isComplete;
private Set<Func> dependsOn;
private Set<Func> observedBy;
private ExecutorService service;
public Function(String name, ExecutorService service){
observedBy = new HashSet<>();
dependsOn = new HashSet<>();
this.isComplete = false;
this.service = service;
}
public void addObserver(Func f){
observedBy.add(f);
f.addDependency(this);
}
private void evaluate() {
boolean match = dependsOn.stream().allMatch(Func::isComplete);
if (match) {
this.isComplete = false;
// inform each observer that I'm about to start to evaluate
observedBy.forEach(func -> func.onDependentEvaluate(this));
CompletableFuture.runAsync(() -> {
try {
System.out.println("Doing some work");
int sleepTime = (int) (Math.random() * 5000);
System.out.println("Working for " + sleepTime);
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, service).thenRun(this::onComplete);
}
}
public void addDependency(Func f){
dependsOn.add(f);
}
@Override
public void change() {;
evaluate();
}
@Override
public void onComplete() {
this.isComplete = true;
observedBy.forEach((f) -> f.onDependentComplete(this));
}
@Override
public void onDependentEvaluate(Func f) {
}
@Override
public void onDependentComplete(Func func){
evaluate();
}
@Override
public boolean isComplete() {
return this.isComplete;
}
}
When I pass service
to runAsync
, only A executes. If I don't pass service
and the CompletableFuture uses the ForkJoin.commonPool, the graph will execute as I expect. Is there a different way to share the ExecutorService
? Any ideas why the next Futures don't get executed?
Upvotes: 1
Views: 948
Reputation: 32323
Your problem is much simpler than you think it is. tl;dr your only problem is that you aren't shutting down the ExecutorService
, so your program doesn't terminate.
Long version: In my environment, this program works as expected, and tasks B
and C
do, in fact, run. I have refactored your code to allow me to create an AbstractFunction
, allowing me to override the behavior of what Function
actually does, e.g.
public abstract class AbstractFunction implements Func {
protected boolean isComplete;
protected Set<Func> dependsOn;
protected Set<Func> observedBy;
protected ExecutorService service;
protected String name;
public AbstractFunction(String name, ExecutorService service) {
observedBy = new HashSet<>();
dependsOn = new HashSet<>();
this.isComplete = false;
this.service = service;
this.name = name;
}
public void addObserver(Func f) {
observedBy.add(f);
f.addDependency(this);
}
public void addDependency(Func f) {
dependsOn.add(f);
}
@Override
public void onComplete() {
this.isComplete = true;
observedBy.forEach((f) -> f.onDependentComplete(this));
}
@Override
public boolean isComplete() {
return this.isComplete;
}
@Override
public void change() { }
@Override
public void onDependentEvaluate(Func f) { }
@Override
public void onDependentComplete(Func func) { }
}
So, your original (effectively unchanged) Function
is now this:
public class Function extends AbstractFunction implements Func {
public Function(String name, ExecutorService service) {
super(name, service);
}
protected void evaluate() {
boolean match = dependsOn.stream().allMatch(Func::isComplete);
if (match) {
this.isComplete = false;
// inform each observer that I'm about to start to evaluate
observedBy.forEach(func -> func.onDependentEvaluate(this));
CompletableFuture.runAsync(() -> {
try {
System.out.println("Doing some work");
int sleepTime = (int) (Math.random() * 5000);
System.out.println("Working for " + sleepTime);
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
System.out.println("I was interrupted, my name is: " + name);
e.printStackTrace();
}
}, service).thenRun(this::onComplete);
}
}
@Override
public void change() {
evaluate();
}
@Override
public void onDependentComplete(Func f) {
evaluate();
}
}
Given this new structure, I create a ShutdownFunc
to listen and shut the executor service when "C"
finishes, i.e.
public class ShutdownFunc extends AbstractFunction {
public ShutdownFunc(String name, ExecutorService service) {
super(name, service);
}
@Override
public void onDependentComplete(Func f) {
System.out.println("Shutting down the system");
try {
service.shutdown();
service.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
And main of course:
public class Main {
public static void main(String... args) {
ExecutorService service = Executors.newFixedThreadPool(Runtime.getRuntime()
.availableProcessors());
Function a = new Function("A", service);
Function b = new Function("B", service);
Function c = new Function("C", service);
a.addObserver(b);
a.addObserver(c);
Func d = new ShutdownFunc("D", service);
c.addObserver(d);
a.change();
}
}
Output:
Doing some work
Working for 4315
Doing some work
Working for 2074
Doing some work
Working for 2884
Shutting down the system
And the program correctly waits for completion, then terminates gracefully.
Upvotes: 1