Reputation: 865
I am using an external library that has the code from below. I am sending a lot of commands and am interesed in the result for statistics to check how many calls failed and how many succeeded
public Future<CommandResult> sendCommand(Command command) {
return command.execute();
}
CommandResult can be success or failure
However, if I use client.sendCommand(command).get();
then, I am waiting for the result synchronously, meanwhile the app is being blocked.
I would like to check only later (after 30 seconds which calls succeded and which failed). I am guaranteed to get an answer in 10 seconds. The problem is that the app waits for the computation to complete, and then retrieves its result.
I was thinking about this approach based on the answers:
List<Future< CommandResult >> futures = new ArrayList<>();
for(Command command: commands) {
futures.add(client.sendCommand(command));
}
//in a scheduler, 30+ seconds later
for (Future<Boolean> future : futures) {
saveResult(future.get());
}
Upvotes: 2
Views: 349
Reputation: 10814
If converting the Future
instances to CompletableFuture
(see answer from Panagiotis Bougioukos) is an option, then you can implement a simple helper function for turning a Stream<CompletableFuture<T>>
into a CompletableFuture<Stream<T>>
:
public static <T> CompletableFuture<Stream<T>> collect(Stream<CompletableFuture<T>> futures) {
return futures
.map(future -> future.thenApply(Stream::of))
.reduce(
CompletableFuture.completedFuture(Stream.empty()),
(future1, future2) ->
future1
.thenCompose(stream1 ->
future2
.thenApply(stream2 ->
concat(stream1, stream2)))
);
}
Essentially this reduces the stream of futures in parallel to a future of a stream.
If you use this e.g. on a stream of futures of strings, it will return a future that completes once the last of the individual futures completed:
Stream<CompletableFuture<String>> streamOfFutures = ...
CompletableFuture<Stream<String>> futureOfStream = collect(streamOfFutures);
// Prints a list of strings once the "slowest" future completed
System.out.println(futureOfStream.get().toList());
Upvotes: 0
Reputation: 116878
I would like to check only later (after 30 seconds which calls succeeded and which failed). I am guaranteed to get an answer in 10 seconds. The problem is that the app waits for the computation to complete, and then retrieves its result.
If you want check on the results at a later time then your solution with Future<Boolean>
should be fine. The jobs will run in the background and you will get the results form then when you call future.get()
. Each of those get()
calls do block however.
If you want to get the results as they come in, I would use an ExecutorCompletionService
which you can poll anytime to see if you have results. The poll is non-blocking.
// create your thread pool using fixed or other pool
Executor<Result> threadPool = Executors.newFixedThreadPool(5);
// wrap the Executor in a CompletionService
CompletionService<Boolean> completionService =
new ExecutorCompletionService<>(e);
// submit jobs to the pool through the ExecutorCompletionService
for (Job job : jobs) {
completionService.submit(job);
}
// after we have submitted all of the jobs we can shutdown the Executor
// the jobs submitted will continue to run
threadPool.shutdown();
...
// some point later you can do
int jobsRunning = jobs.size();
for (int jobsRunning = jobs.size(); jobsRunning > 0; ) {
// do some processing ...
// are any results available?
Boolean result = completionService.poll();
if (result != null) {
// process a result if available
jobsRunning--;
}
}
Note that you will need to track how many jobs you submitted to the CompletionService
.
Upvotes: 1
Reputation: 18929
Future
is a legacy java feature which does not allow for reactive non blocking functionalities. The CompletableFuture
is a later enhancement in Java in order to allow such reactive non blocking functionalities.
You can based on this previous SO answer try to convert your Future
into a CompletableFuture
and then you will have methods exposed to take advantage of non blocking execution.
Check the following example and modify accordingly.
public class Application {
public static void main(String[] args) throws ParseException {
Future future = new SquareCalculator().calculate(10);
CompletableFuture<Integer> completableFuture = makeCompletableFuture(future);
System.out.println("before apply");
completableFuture.thenApply(s -> {
System.out.println(s);
return s;
});
System.out.println("after apply method");
}
public static <T> CompletableFuture<T> makeCompletableFuture(Future<T> future) {
if (future.isDone())
return transformDoneFuture(future);
return CompletableFuture.supplyAsync(() -> {
try {
if (!future.isDone())
awaitFutureIsDoneInForkJoinPool(future);
return future.get();
} catch (ExecutionException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
// Normally, this should never happen inside ForkJoinPool
Thread.currentThread().interrupt();
// Add the following statement if the future doesn't have side effects
// future.cancel(true);
throw new RuntimeException(e);
}
});
}
private static <T> CompletableFuture<T> transformDoneFuture(Future<T> future) {
CompletableFuture<T> cf = new CompletableFuture<>();
T result;
try {
result = future.get();
} catch (Throwable ex) {
cf.completeExceptionally(ex);
return cf;
}
cf.complete(result);
return cf;
}
private static void awaitFutureIsDoneInForkJoinPool(Future<?> future)
throws InterruptedException {
ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker() {
@Override public boolean block() throws InterruptedException {
try {
future.get();
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
return true;
}
@Override public boolean isReleasable() {
return future.isDone();
}
});
}
}
And then the class to create an example Future
public class SquareCalculator {
private ExecutorService executor
= Executors.newSingleThreadExecutor();
public Future<Integer> calculate(Integer input) {
return executor.submit(() -> {
Thread.sleep(1000);
return input * input;
});
}
}
Will result into
Upvotes: 1