Jordi
Jordi

Reputation: 23187

java stream: custom collector

This is my stream:

Stream<CompletableFuture<String>> futureStream = IntStream
    .iterate(1, n -> n < resultSet.getTotalCount() / pageSize, n -> n++)
    .mapToObj(pageNumber -> this.buildCompletableFutureofResultSetType(oid, pageNumber, pageSize));

CompletableFuture has a allOf:

public static CompletableFuture<Object> allOf(CompletableFuture<?>... cfs);

I've took a look on allOf method code:

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
    return andTree(cfs, 0, cfs.length - 1);
}

I know I can first collect all of them in a List<CompletableFuture> and then pass it to allOf.

However, I'd like to chain this collector process into stream functional chain.

Any ideas?

Upvotes: 2

Views: 519

Answers (1)

rzwitserloot
rzwitserloot

Reputation: 102812

However, I'd like to chain this collector process into stream functional chain.

No, you don't.

The andTree method tries to build a balanced tree. It is not exactly trivial to try to construct a balanced tree on your own, and may not even be possible (the fact that CompletableFuture can build balanced trees is an implementation detail, in the sense that the API doesn't expose any of this, and consequently you can't use it; these are all private/package-private methods).

Collectors, in the mean time, are quite complicated notions; they need to be able to on-the-fly balance and fill a potentially parallel stream of incoming data whose ordering and sorting properties are variable. Hence, writing your own collector is possible but it is very easy to shoot yourself in the foot when you do that.

So, if you want to 'directly' chain your stream of futures into a single future that completed when all elements in the stream complicates, it's extremely complicated and will result in an unbalanced tree. Which is highly likely to less efficient than introducing the overhead of going via an intermediate list.

How many futures are we talking about? If the answer is 'less than 10000', the cost of that intermediate list is effectively nil. If the answer is 'more than 10000', perhaps the costs of that list are problematic, but having an unbalanced tree for those futures is vastly more problematic. Either way, directly collecting the stream is the wrong answer.

Thus, what you want:

streamOfFutures.collect(Collectors.collectingAndThen(Collectors.toList(), list -> CompletableFuture.allOf(list.toArray(CompletableFuture[]::new))));

// or possibly, as it's much cleaner and shorter...
// Remember, functional style / streams are a tool, not some sort of magic
// incantation that guarantees elegant code!

CompletableFuture.allOf(streamOfFutures.toArray(CompletableFuture[]::new));

Upvotes: 2

Related Questions