Les
Les

Reputation: 241

FixedThreadPool executor limited by some termination condition

Maybe, a novice question, but it bothered me and under the pile of simple tutorials and docs, I didn't find my answer.

Question. What is the best way in Java, using high level parallelism patterns from JDK(1.8), to implement the next? A fixed pool of N threads is doing the same pre-defined task until the termination condition reached. So, the amount of tasks isn't pre-defined, and condition is an outside real-time trigger. We must react to the termination eagerly but not consume too many resources switching context and stealing CPU time from the workers' threads. Let's say that we have only two-four weak physical threads to spend much on the control thread.

If it helps, current thoughts are implemented in the next code, but dymanic tasks queue while mostly sleeping control cycle looks not enought neat solution for me.

try {
    mainCycle(agent, executor, terminationCondition);
} catch (InterruptedException e) {
    log.warn(INTERRUPTED_EX, e);
} finally {
    executor.shutdownNow();
}

 private static void mainCycle(Callable<Long> agent,
                                  ExecutorService executor,
                                  Supplier<Boolean> terminationCondition
) throws InterruptedException {

    final List<Future<Long>> runFutureResults = executor.
            invokeAll(Collections.nCopies(parallelAgents, agent));

    int tasksReserve = BASE_MULTIPLIER;
    //noinspection MethodCallInLoopCondition, by design
    while (!terminationCondition.get()) {
        tasksReserve = addTasksIfNeed(executor, runFutureResults);
        Thread.sleep(1000);
    }
}

Upvotes: 0

Views: 136

Answers (1)

Chris Mowforth
Chris Mowforth

Reputation: 6723

Either use some kind of coordination mechanism (phaser, which was introduced in Java 7 is useful when you need to add more jobs on the fly) or just keep an external flag you set when done:

ExecutorService service = Executors.newFixedThreadPool(N);
volatile boolean done = false;

// add your jobs:
service.submit(() -> {
    while (!done) {
        // do something
    }
});

// set your flag
done = true;

It's enough for the flag to be volatile because only one thread is mutating the value; the threads inside the executor just need visibility into when it changes.

Upvotes: 3

Related Questions