Reputation: 1485
I am using executorservice and each webservice call spawns about 9-10 Callable tasks and submits to the executorService Thread pool. There is 1 single executorService for the app with thread pool size of 100. when i submit the callables, i have a 2 For loops. The outer loop runs either till a specified timeout expires Or Completed hashset size == submitted tasks size; and the inner loop goes through the Callables and if isDone()== true then those are collected in a 'Completed' hashset. When the outerloop condition fails, I loop through the callables in 'Completed'hashset and aggregate the results. The thing is i am sure there is a more elegant solution than using 2 loops.
what is the best way that I can be notified if either all the tasks Complete or the timeout expires ? Any framework, library, etc or design pattern ?
Upvotes: 1
Views: 211
Reputation: 11280
You can use CompletableFuture
for this.
CompletableFuture.supplyAsync()
collect these in your HashSet
.CompletableFuture
with thenCombine()
get()
with your timeout, and if a TimeoutException
occurs, complete all the calculation futures with a default value. Then the aggregate will return the partial result immediately.This is also a push style approach, but all the notification logic is done by the CompletableFuture
class for you.
Example (using sum of Integer as aggregation)
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(10);
Set<CompletableFuture<Integer>> calculations = new HashSet<>();
CompletableFuture<Integer> sum = CompletableFuture.completedFuture(0);
for (int i = 0; i < 100; i++) {
// submit to thread pool
CompletableFuture<Integer> calculation =
CompletableFuture.supplyAsync(CompleteableFutureGather::longCalculation, executorService);
calculations.add(calculation);
sum = sum.thenCombine(calculation, Integer::sum); // set up future aggregation
}
int total = 0;
try {
total = sum.get(5, TimeUnit.SECONDS); // set timeout
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
throw (RuntimeException) e.getCause();
} catch (TimeoutException e) {
// preemptively complete calculations with default value, those already completed will be unaffected
calculations.forEach(integerCompletableFuture -> integerCompletableFuture.complete(0));
total = sum.getNow(0); // everything is complete so partial aggregation will be returned immediately
}
System.out.println(total);
}
private static Integer longCalculation() {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return 1;
}
Upvotes: 1
Reputation: 2026
You basically have two options, pull or push.
Pull is what you've already tried - send out all the async tasks, keep the references to them and call isDone() until they've all finished.
Push, on the other hand, separates the invoke and notification tasks. You'd invoke the async tasks and the method would then immediately return. The notification would be handled by the tasks themselves, they would need to inform when their work was completed.
This notification is easily implemented by the Observer Pattern, or CDI Events if you're using Java EE.
I personally prefer the push method as it cleans up the code and separates the concerns of invoking the tasks and handling the results. Either way is just fine though.
Upvotes: 1