Reputation: 10082
Given a list of functions, where Context
is some custom type:
List<Function<Context, CompletableFuture<Context>>> preprocessors;
I need to get a CompletableFuture<Context>
having executed every single of those function in order feeding the async result of every iteration into the next one. So I need a general solution (for a variable list size) for the following statement with 3 elements in the list:
Context context;
CompletableFuture<Context> promise = preprocessors.get(0).apply(context)
.thenCompose((c) -> preprocessors.get(1).apply(c))
.thenCompose((c) -> preprocessors.get(2).apply(c));
Any ideas?
To get rid of indices, the above could also look like this:
Iterator<Function<Context, CompletableFuture<Context>>> it = preprecessors.iterator();
Context context;
CompletableFuture<Context> promise = it.next().apply(context)
.thenCompose((c) -> it.next().apply(c))
.thenCompose((c) -> it.next().apply(c));
Still, how do I generalise this further into a variable element count?
Upvotes: 2
Views: 1577
Reputation: 3919
I think I managed to do this with reduce like this:
CompletableFuture<Context> res =
preprocessors.stream()
.reduce(CompletableFuture.completedFuture(context),
(future, processor) -> future.thenCompose(processor::apply),
(old, current) -> current);
The accumulator (the second param) gets receives the future and the processor and produces the next future. So the combiner (the third param) can safely throw away the "old" future and just return the new one.
Untested! It may not work as advertised :)
Upvotes: 1
Reputation: 158
Assuming you are only interested on the result for the last CompleteableFuture (promise), then you could just get each iteration promise and compose upon it.
Taking either an index or foreach loop you could use the following code snippet for your purpose:
CompleteableFuture<Context> promise = CompleteableFuture.completedFuture(context);
for(Function<Context, CompletableFuture<Context>> preprocessor: preprocessors) {
promise = promise.thenCompose(ctx -> preprocessor.apply(ctx));
}
Upvotes: 3
Reputation: 73558
Looking into reduce()
it seemed to be hard to work around the special handling for the first element, and impossible to guarantee sequential handling. Using just regular Java you could write
List<Function<Context, CompletableFuture<Context>>> preprocessors;
Context context;
Iterator<Function<Context, CompletableFuture<Context>>> itr = preprocessors.iterator();
CompletableFuture<Context> promise = itr.next().apply(context);
while(itr.hasNext())
promise = promise.thenCompose(c -> itr.next().apply(c));
Upvotes: 0
Reputation: 4048
Perhaps something like RxJava might be useful:
Flowable.fromIterable(preprocessors)
.map(function -> function.apply(context))
.flatMap(Flowable::fromFuture)
.subscribe(c -> {
// Whatever it is you want to do with the context that is returned
});
Though I must admit I'm not entirely sure if this will ensure sequential execution of all futures
https://github.com/ReactiveX/RxJava
Upvotes: 0