Reputation: 396
I have to run multiple external call operations and then obtain the result in a form of list.
I decided to use the CompletableFuture
api, and the code I prepared is pretty disgusting:
The example:
public class Main {
public static void main(String[] args) {
String prefix = "collection_";
List<CompletableFuture<User>> usersResult = IntStream.range(1, 10)
.boxed()
.map(num -> prefix.concat("" + num))
.map(name -> CompletableFuture.supplyAsync(
() -> callApi(name)))
.collect(Collectors.toList());
try {
CompletableFuture.allOf(usersResult.toArray(new CompletableFuture[usersResult.size()])).get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
List<User> users = usersResult //the result I need
.stream()
.map(userCompletableFuture -> {
try {
return userCompletableFuture.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
return null;
})
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
private static User callApi(String collection) {
return new User(); //potentially time-consuming operation
}
}
I have the following questions:
try-catch
block in the stream, where I'm mapping CompletableFuture to User?Is it ok, to do it this way (will all the futures be resolved in the stream?):
public class Main {
public static void main(String[] args) {
String prefix = "collection_";
List<User> usersResult = IntStream.range(1, 10)
.boxed()
.map(num -> prefix.concat("" + num))
.map(name -> CompletableFuture.supplyAsync(
() -> callApi(name)))
.filter(Objects::nonNull)
.map(userCompletableFuture -> {
try {
return userCompletableFuture.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
return null;
})
.collect(Collectors.toList());
}
private static User callApi(String collection) {
return new User(); //potentially time-consuming operation
}
}
Upvotes: 7
Views: 18963
Reputation: 15028
Alternatively, you can just drop the CompletableFuture
and use parallelStream()
as Didier mentioned:
Optional<User> wrapApiCall(String name) {
try { return Optional.of(callApi(name)); }
catch (Exception e) {
e.printStackTrace();
return Optional.empty();
}
}
List<User> usersResult = IntStream.range(1, 10)
.boxed()
.parallelStream()
.map(num -> String.format("%s%d", prefix, num))
.map(this::wrapApiCall)
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toList());
Upvotes: 0
Reputation: 20608
For 1., you can entirely skip the allOf().get()
calls since you are anyway waiting on all futures one by one.¹
For 2., you can simplify the try-catch
by doing the following:
exceptionally()
to handle exceptions directly in the future;join()
instead of get()
to avoid checked exceptions (and you know no exceptions are possible).For 3., you cannot really make it less sequential since you need at least to steps: create all futures and then process their results.
If you do everything in a single stream, it will create each future, then immediately wait on it before creating the next one – so you would lose the parallelism. You could use a parallel stream instead but then there wouldn't be much benefit of using CompletableFuture
s.
So the final code is:
List<CompletableFuture<User>> usersResult = IntStream.range(1, 10)
.boxed()
.map(num -> prefix.concat("" + num))
.map(name -> CompletableFuture.supplyAsync(() -> callApi(name))
.exceptionally(e -> {
e.printStackTrace();
return null;
}))
.collect(Collectors.toList());
List<User> users = usersResult
.stream()
.map(CompletableFuture::join)
.filter(Objects::nonNull)
.collect(Collectors.toList());
¹ Note that an allOf()
call remains needed if you want your result to be a CompletableFuture<List<User>>
as well, e.g.
final CompletableFuture<List<User>> result =
CompletableFuture.allOf(usersResult.stream().toArray(CompletableFuture[]::new))
.thenApply(__ -> usersResult
.stream()
.map(CompletableFuture::join)
.filter(Objects::nonNull)
.collect(Collectors.toList()));
Upvotes: 10