Suh. Junmin
Suh. Junmin

Reputation: 33

chain multiple CompletableFuture as sequential and collect all results as list

I have a long running business job with the return type 'CompletionStage'.

And I want to run these tasks several times through for loop and I want to retrieve results as a list.

However, this task must be executed sequentially due to the concurrency issue.

here is the example process function that return CompletionStage

private CompletionStage<Integer> process(int a) {
    return CompletableFuture.supplyAsync(() -> {
        System.err.printf("%s dispatch %d\n", LocalDateTime.now(), a);

        // some long running business process
        return a + 10;
    }).whenCompleteAsync((e, t) -> {
        if (t != null)
            System.err.printf("!!! error processing '%d' !!!\n", a);

        System.err.printf("%s finish %d\n", LocalDateTime.now(), e);
    });
}

my first approach was success

// First approach

List<Integer> arr = IntStream.range(1, 10).boxed().collect(Collectors.toList());

CompletionStage<List<Integer>> result = CompletableFuture.completedFuture(new ArrayList<>());
for (Integer element: arr) {
    result = result.thenApplyAsync((ret) -> {
        Integer a = process(element).toCompletableFuture().join();

        ret.add(a);

        return ret;
    });
}

List<Integer> computeResult = result.toCompletableFuture().join();
2022-11-01T10:43:24.571573 dispatch 1
2022-11-01T10:43:24.571999 finish 11
2022-11-01T10:43:24.572414 dispatch 2
2022-11-01T10:43:24.572629 finish 12
2022-11-01T10:43:24.572825 dispatch 3
2022-11-01T10:43:24.572984 finish 13
2022-11-01T10:43:24.573097 dispatch 4
2022-11-01T10:43:24.573227 finish 14
2022-11-01T10:43:24.573354 dispatch 5
2022-11-01T10:43:24.573541 finish 15
2022-11-01T10:43:24.573657 dispatch 6
2022-11-01T10:43:24.573813 finish 16
2022-11-01T10:43:24.573929 dispatch 7
2022-11-01T10:43:24.574055 finish 17
2022-11-01T10:43:24.574168 dispatch 8
2022-11-01T10:43:24.574326 finish 18
2022-11-01T10:43:24.574428 dispatch 9
2022-11-01T10:43:24.574589 finish 19

but as you can see, wait completablefuture inside completablefuture is bit weird i think.

because one stage consumes two threads.

so, i decide to refactor it.

but the second approach was failed


// Second approach
List<Integer> arr = IntStream.range(1, 10).boxed().collect(Collectors.toList());

CompletionStage<List<Integer>> result = CompletableFuture.completedFuture(new ArrayList<>());
for (Integer element : arr) {
    result = result.thenCombineAsync(process(element), (array, ret) -> { array.add(ret); return array; });
}

result.toCompletableFuture().join();
2022-11-01T10:44:36.875930 dispatch 1
2022-11-01T10:44:36.876438 finish 11
2022-11-01T10:44:36.876461 dispatch 2
2022-11-01T10:44:36.876832 dispatch 4
2022-11-01T10:44:36.876847 finish 12
2022-11-01T10:44:36.876908 dispatch 3
2022-11-01T10:44:36.876968 dispatch 5
2022-11-01T10:44:36.877108 dispatch 8
2022-11-01T10:44:36.877042 dispatch 6
2022-11-01T10:44:36.876988 finish 14
2022-11-01T10:44:36.877123 dispatch 9
2022-11-01T10:44:36.877275 finish 18
2022-11-01T10:44:36.877195 finish 15
2022-11-01T10:44:36.877371 finish 19
2022-11-01T10:44:36.877262 dispatch 7
2022-11-01T10:44:36.877316 finish 16
2022-11-01T10:44:36.877191 finish 13
2022-11-01T10:44:36.877553 finish 17

and I don't have any idea how to chain and retrieve results.

Is there a better way to retrieve results from multiple CompletionStages?

Upvotes: 3

Views: 1448

Answers (1)

Davide D&#39;Alto
Davide D&#39;Alto

Reputation: 8206

This should work:

List<Integer> arr = IntStream.range(1, 10).boxed().collect(Collectors.toList());

CompletionStage<Void> loopStage = CompletableFuture.completedFuture( null );
final List<Integer> resultList = new ArrayList<>();
for (Integer element: arr) {
    loopStage = loopStage
            .thenCompose( v -> process( element ) )
            .thenAccept( resultList::add );
}

loopStage.toCompletableFuture().join();

// resultList now contains all the results
System.out.println(resultList);

or

List<Integer> arr = IntStream.range( 1, 10 ).boxed().collect( Collectors.toList() );
CompletionStage<List<Integer>> listStage = CompletableFuture.completedFuture( new ArrayList<>() );
for ( Integer element : arr ) {
    listStage = listStage
            .thenCompose( list -> process( element )
                    .thenAccept( list::add )
                    .thenApply( v -> list )
            );
}

List<Integer> resultList = listStage.toCompletableFuture().join();

System.out.println(resultList);

Upvotes: 4

Related Questions