Jeffrey Guenther
Jeffrey Guenther

Reputation: 901

Why does this CompletableFuture not execute with a shared ExecutorService?

I'm playing with an idea I have for a dataflow language and I'm trying to use a an executor service to limit the number of available threads to the number of cores on my system and create a shared pool of threads.

 ExecutorService service = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
 Function a = new Function("A", service);
 Function b = new Function("B", service);
 Function c = new Function("C", service);
 a.addObserver(b);
 a.addObserver(c);
 a.change()

A is changed causing it to call it's evaluate(), then B and C are notified they need to update and they do.

In Function.java I have:

 public class Function implements Func{
    private boolean isComplete;
    private Set<Func> dependsOn;
    private Set<Func> observedBy;
    private ExecutorService service;

    public Function(String name, ExecutorService service){
        observedBy = new HashSet<>();
        dependsOn = new HashSet<>();
        this.isComplete = false;
        this.service = service;
    }

    public void addObserver(Func f){
        observedBy.add(f);
        f.addDependency(this);
    }

    private void evaluate() {
        boolean match = dependsOn.stream().allMatch(Func::isComplete);

        if (match) {
            this.isComplete = false;

            // inform each observer that I'm about to start to evaluate
            observedBy.forEach(func -> func.onDependentEvaluate(this));

            CompletableFuture.runAsync(() -> {
                try {
                    System.out.println("Doing some work");
                    int sleepTime = (int) (Math.random() * 5000);
                    System.out.println("Working for " + sleepTime);
                    Thread.sleep(sleepTime);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }, service).thenRun(this::onComplete);
        }
    }

    public void addDependency(Func f){
        dependsOn.add(f);
    }

    @Override
    public void change() {;
        evaluate();
    }

    @Override
    public void onComplete() {
        this.isComplete = true;
        observedBy.forEach((f) -> f.onDependentComplete(this));
    }

    @Override
    public void onDependentEvaluate(Func f) {
    }

    @Override
    public void onDependentComplete(Func func){
        evaluate();
    }

    @Override
    public boolean isComplete() {
        return this.isComplete;
    }
}

When I pass service to runAsync, only A executes. If I don't pass service and the CompletableFuture uses the ForkJoin.commonPool, the graph will execute as I expect. Is there a different way to share the ExecutorService? Any ideas why the next Futures don't get executed?

Upvotes: 1

Views: 948

Answers (1)

durron597
durron597

Reputation: 32323

Your problem is much simpler than you think it is. tl;dr your only problem is that you aren't shutting down the ExecutorService, so your program doesn't terminate.

Long version: In my environment, this program works as expected, and tasks B and C do, in fact, run. I have refactored your code to allow me to create an AbstractFunction, allowing me to override the behavior of what Function actually does, e.g.

public abstract class AbstractFunction implements Func {
  protected boolean isComplete;
  protected Set<Func> dependsOn;
  protected Set<Func> observedBy;
  protected ExecutorService service;
  protected String name;

  public AbstractFunction(String name, ExecutorService service) {
    observedBy = new HashSet<>();
    dependsOn = new HashSet<>();
    this.isComplete = false;
    this.service = service;
    this.name = name;
  }

  public void addObserver(Func f) {
    observedBy.add(f);
    f.addDependency(this);
  }

  public void addDependency(Func f) {
    dependsOn.add(f);
  }

  @Override
  public void onComplete() {
    this.isComplete = true;
    observedBy.forEach((f) -> f.onDependentComplete(this));
  }

  @Override
  public boolean isComplete() {
    return this.isComplete;
  }

  @Override
  public void change() { }

  @Override
  public void onDependentEvaluate(Func f) { }

  @Override
  public void onDependentComplete(Func func) { }
}

So, your original (effectively unchanged) Function is now this:

public class Function extends AbstractFunction implements Func {
  public Function(String name, ExecutorService service) {
    super(name, service);
  }

  protected void evaluate() {
    boolean match = dependsOn.stream().allMatch(Func::isComplete);

    if (match) {
      this.isComplete = false;

      // inform each observer that I'm about to start to evaluate
      observedBy.forEach(func -> func.onDependentEvaluate(this));

      CompletableFuture.runAsync(() -> {
        try {
          System.out.println("Doing some work");
          int sleepTime = (int) (Math.random() * 5000);
          System.out.println("Working for " + sleepTime);
          Thread.sleep(sleepTime);
        } catch (InterruptedException e) {
          System.out.println("I was interrupted, my name is: " + name);
          e.printStackTrace();
        }
      }, service).thenRun(this::onComplete);
    }
  }

  @Override
  public void change() {
    evaluate();
  }

  @Override
  public void onDependentComplete(Func f) {
    evaluate();
  }
}

Given this new structure, I create a ShutdownFunc to listen and shut the executor service when "C" finishes, i.e.

public class ShutdownFunc extends AbstractFunction {
  public ShutdownFunc(String name, ExecutorService service) {
    super(name, service);
  }

  @Override
  public void onDependentComplete(Func f) {
    System.out.println("Shutting down the system");
    try {
      service.shutdown();
      service.awaitTermination(5, TimeUnit.SECONDS);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }
}

And main of course:

public class Main {
  public static void main(String... args) {
    ExecutorService service = Executors.newFixedThreadPool(Runtime.getRuntime()
        .availableProcessors());
    Function a = new Function("A", service);
    Function b = new Function("B", service);
    Function c = new Function("C", service);
    a.addObserver(b);
    a.addObserver(c);

    Func d = new ShutdownFunc("D", service);
    c.addObserver(d);

    a.change();
  }
}

Output:

Doing some work
Working for 4315
Doing some work
Working for 2074
Doing some work
Working for 2884
Shutting down the system

And the program correctly waits for completion, then terminates gracefully.

Upvotes: 1

Related Questions