Reputation: 23187
This is my stream:
Stream<CompletableFuture<String>> futureStream = IntStream
.iterate(1, n -> n < resultSet.getTotalCount() / pageSize, n -> n++)
.mapToObj(pageNumber -> this.buildCompletableFutureofResultSetType(oid, pageNumber, pageSize));
CompletableFuture
has a allOf
:
public static CompletableFuture<Object> allOf(CompletableFuture<?>... cfs);
I've took a look on allOf
method code:
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
return andTree(cfs, 0, cfs.length - 1);
}
I know I can first collect all of them in a List<CompletableFuture>
and then pass it to allOf
.
However, I'd like to chain this collector process
into stream functional chain.
Any ideas?
Upvotes: 2
Views: 519
Reputation: 102812
However, I'd like to chain this
collector process
into stream functional chain.
No, you don't.
The andTree
method tries to build a balanced tree. It is not exactly trivial to try to construct a balanced tree on your own, and may not even be possible (the fact that CompletableFuture
can build balanced trees is an implementation detail, in the sense that the API doesn't expose any of this, and consequently you can't use it; these are all private/package-private methods).
Collectors, in the mean time, are quite complicated notions; they need to be able to on-the-fly balance and fill a potentially parallel stream of incoming data whose ordering and sorting properties are variable. Hence, writing your own collector is possible but it is very easy to shoot yourself in the foot when you do that.
So, if you want to 'directly' chain your stream of futures into a single future that completed when all elements in the stream complicates, it's extremely complicated and will result in an unbalanced tree. Which is highly likely to less efficient than introducing the overhead of going via an intermediate list.
How many futures are we talking about? If the answer is 'less than 10000', the cost of that intermediate list is effectively nil. If the answer is 'more than 10000', perhaps the costs of that list are problematic, but having an unbalanced tree for those futures is vastly more problematic. Either way, directly collecting the stream is the wrong answer.
Thus, what you want:
streamOfFutures.collect(Collectors.collectingAndThen(Collectors.toList(), list -> CompletableFuture.allOf(list.toArray(CompletableFuture[]::new))));
// or possibly, as it's much cleaner and shorter...
// Remember, functional style / streams are a tool, not some sort of magic
// incantation that guarantees elegant code!
CompletableFuture.allOf(streamOfFutures.toArray(CompletableFuture[]::new));
Upvotes: 2