Ashish Thukral
Ashish Thukral

Reputation: 1485

executorThreadPool check when tasks complete or timeout expires

I am using executorservice and each webservice call spawns about 9-10 Callable tasks and submits to the executorService Thread pool. There is 1 single executorService for the app with thread pool size of 100. when i submit the callables, i have a 2 For loops. The outer loop runs either till a specified timeout expires Or Completed hashset size == submitted tasks size; and the inner loop goes through the Callables and if isDone()== true then those are collected in a 'Completed' hashset. When the outerloop condition fails, I loop through the callables in 'Completed'hashset and aggregate the results. The thing is i am sure there is a more elegant solution than using 2 loops.

what is the best way that I can be notified if either all the tasks Complete or the timeout expires ? Any framework, library, etc or design pattern ?

Upvotes: 1

Views: 211

Answers (2)

bowmore
bowmore

Reputation: 11280

You can use CompletableFuture for this.

  • Do the tasks in CompletableFuture.supplyAsync() collect these in your HashSet.
  • Aggregate all these in one CompletableFuture with thenCombine()
  • On the aggregate perform get() with your timeout, and if a TimeoutException occurs, complete all the calculation futures with a default value. Then the aggregate will return the partial result immediately.

This is also a push style approach, but all the notification logic is done by the CompletableFuture class for you.

Example (using sum of Integer as aggregation)

public static void main(String[] args) {
    ExecutorService executorService = Executors.newFixedThreadPool(10);

    Set<CompletableFuture<Integer>> calculations = new HashSet<>();

    CompletableFuture<Integer> sum = CompletableFuture.completedFuture(0);

    for (int i = 0; i < 100; i++) {
        // submit to thread pool
        CompletableFuture<Integer> calculation =
                CompletableFuture.supplyAsync(CompleteableFutureGather::longCalculation, executorService);
        calculations.add(calculation);
        sum = sum.thenCombine(calculation, Integer::sum); // set up future aggregation
    }
    int total = 0;
    try {
        total = sum.get(5, TimeUnit.SECONDS); // set timeout
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    } catch (ExecutionException e) {
        throw (RuntimeException) e.getCause();
    } catch (TimeoutException e) {
        // preemptively complete calculations with default value, those already completed will be unaffected
        calculations.forEach(integerCompletableFuture -> integerCompletableFuture.complete(0));
        total = sum.getNow(0); // everything is complete so partial aggregation will be returned immediately
    }
    System.out.println(total);
}

private static Integer longCalculation() {
    try {
        Thread.sleep(5000);
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
    return 1;
}

Upvotes: 1

nlloyd
nlloyd

Reputation: 2026

You basically have two options, pull or push.

Pull is what you've already tried - send out all the async tasks, keep the references to them and call isDone() until they've all finished.

Push, on the other hand, separates the invoke and notification tasks. You'd invoke the async tasks and the method would then immediately return. The notification would be handled by the tasks themselves, they would need to inform when their work was completed.

This notification is easily implemented by the Observer Pattern, or CDI Events if you're using Java EE.

I personally prefer the push method as it cleans up the code and separates the concerns of invoking the tasks and handling the results. Either way is just fine though.

Upvotes: 1

Related Questions