Oleg Sklyar
Oleg Sklyar

Reputation: 10082

Chaining a variable number of promises (CompletableFuture) in Java

Given a list of functions, where Context is some custom type:

List<Function<Context, CompletableFuture<Context>>> preprocessors;

I need to get a CompletableFuture<Context> having executed every single of those function in order feeding the async result of every iteration into the next one. So I need a general solution (for a variable list size) for the following statement with 3 elements in the list:

Context context;

CompletableFuture<Context> promise = preprocessors.get(0).apply(context)
    .thenCompose((c) -> preprocessors.get(1).apply(c))
    .thenCompose((c) -> preprocessors.get(2).apply(c));

Any ideas?

To get rid of indices, the above could also look like this:

Iterator<Function<Context, CompletableFuture<Context>>> it = preprecessors.iterator();

Context context;

CompletableFuture<Context> promise = it.next().apply(context)
    .thenCompose((c) -> it.next().apply(c))
    .thenCompose((c) -> it.next().apply(c));

Still, how do I generalise this further into a variable element count?

Upvotes: 2

Views: 1577

Answers (4)

ivant
ivant

Reputation: 3919

I think I managed to do this with reduce like this:

        CompletableFuture<Context> res =
            preprocessors.stream()
                         .reduce(CompletableFuture.completedFuture(context),
                                 (future, processor) -> future.thenCompose(processor::apply),
                                 (old, current) -> current);

The accumulator (the second param) gets receives the future and the processor and produces the next future. So the combiner (the third param) can safely throw away the "old" future and just return the new one.

Untested! It may not work as advertised :)

Upvotes: 1

Jo&#227;o Rebelo
Jo&#227;o Rebelo

Reputation: 158

Assuming you are only interested on the result for the last CompleteableFuture (promise), then you could just get each iteration promise and compose upon it.

Taking either an index or foreach loop you could use the following code snippet for your purpose:

CompleteableFuture<Context> promise = CompleteableFuture.completedFuture(context);
for(Function<Context, CompletableFuture<Context>> preprocessor: preprocessors) {
  promise = promise.thenCompose(ctx -> preprocessor.apply(ctx));
}

Upvotes: 3

Kayaman
Kayaman

Reputation: 73558

Looking into reduce() it seemed to be hard to work around the special handling for the first element, and impossible to guarantee sequential handling. Using just regular Java you could write

List<Function<Context, CompletableFuture<Context>>> preprocessors;
Context context;
Iterator<Function<Context, CompletableFuture<Context>>> itr = preprocessors.iterator();

CompletableFuture<Context> promise = itr.next().apply(context);
while(itr.hasNext())
    promise = promise.thenCompose(c -> itr.next().apply(c));

Upvotes: 0

Jeroen Steenbeeke
Jeroen Steenbeeke

Reputation: 4048

Perhaps something like RxJava might be useful:

    Flowable.fromIterable(preprocessors)
            .map(function -> function.apply(context))
            .flatMap(Flowable::fromFuture)
            .subscribe(c -> {
                // Whatever it is you want to do with the context that is returned
            });

Though I must admit I'm not entirely sure if this will ensure sequential execution of all futures

https://github.com/ReactiveX/RxJava

Upvotes: 0

Related Questions