Savior
Savior

Reputation: 3531

How do I collect the results of calling an async API sequentially?

I have an async API that essentially returns results through pagination

public CompletableFuture<Response> getNext(int startFrom);

Each Response object contains a list of offsets from startFrom and a flag indicating whether there are more elements remaining and, therefore, another getNext() request to make.

I'd like to write a method that goes through all the pages and retrieves all the offsets. I can write it in a synchronous manner like so

int startFrom = 0;
List<Integer> offsets = new ArrayList<>();

for (;;) {
    CompletableFuture<Response> future = getNext(startFrom);
    Response response = future.get(); // an exception stops everything
    if (response.getOffsets().isEmpty()) {
        break; // we're done
    }
    offsets.addAll(response.getOffsets());
    if (!response.hasMore()) {
        break; // we're done
    }
    startFrom = getLast(response.getOffsets());
}

In other words, we call getNext() with startFrom at 0. If an exception is thrown, we short-circuit the entire process. Otherwise, if there are no offsets, we complete. If there are offsets, we add them to the master list. If there are no more left to fetch, we complete. Otherwise, we reset the startFrom to the last offset we fetched and repeat.

Ideally, I want to do this without blocking with CompletableFuture::get() and returning a CompletableFuture<List<Integer>> containing all the offsets.

How can I do this? How can I compose the futures to collect their results?


I'm thinking of a "recursive" (not actually in execution, but in code)

private CompletableFuture<List<Integer>> recur(int startFrom, List<Integer> offsets) {
    CompletableFuture<Response> future = getNext(startFrom);
    return future.thenCompose((response) -> {
        if (response.getOffsets().isEmpty()) {
            return CompletableFuture.completedFuture(offsets);
        }
        offsets.addAll(response.getOffsets());
        if (!response.hasMore()) {
            return CompletableFuture.completedFuture(offsets);
        }
        return recur(getLast(response.getOffsets()), offsets);
    });
}

public CompletableFuture<List<Integer>> getAll() {
    List<Integer> offsets = new ArrayList<>();
    return recur(0, offsets);
}

I don't love this, from a complexity point of view. Can we do better?

Upvotes: 6

Views: 2003

Answers (2)

Didier L
Didier L

Reputation: 20579

I also wanted to give a shot at EA Async on this one, as it implements Java support for async/await (inspired from C#). So I just took your initial code, and converted it:

public CompletableFuture<List<Integer>> getAllEaAsync() {
    int startFrom = 0;
    List<Integer> offsets = new ArrayList<>();

    for (;;) {
        // this is the only thing I changed!
        Response response = Async.await(getNext(startFrom));
        if (response.getOffsets().isEmpty()) {
            break; // we're done
        }
        offsets.addAll(response.getOffsets());
        if (!response.hasMore()) {
            break; // we're done
        }
        startFrom = getLast(response.getOffsets());
    }

    // well, you also have to wrap your result in a future to make it compilable
    return CompletableFuture.completedFuture(offsets);
}

You then have to instrument your code, for example by adding

Async.init();

at the beginning of your main() method.

I must say: this really looks like magic!

Behind the scenes, EA Async notices there is an Async.await() call within the method, and rewrites it to handle all the thenCompose()/thenApply()/recursion for you. The only requirement is that your method must return a CompletionStage or CompletableFuture.

That's really async code made easy!

Upvotes: 2

Didier L
Didier L

Reputation: 20579

For the exercise, I made a generic version of this algorithm, but it is rather complex because you need:

  1. an initial value to call the service (the startFrom)
  2. the service call itself (getNext())
  3. a result container to accumulate the intermediate values (the offsets)
  4. an accumulator (offsets.addAll(response.getOffsets()))
  5. a condition to perform the "recursion" (response.hasMore())
  6. a function to compute the next input (getLast(response.getOffsets()))

so this gives:

public <T, I, R> CompletableFuture<R> recur(T initialInput, R resultContainer,
        Function<T, CompletableFuture<I>> service,
        BiConsumer<R, I> accumulator,
        Predicate<I> continueRecursion,
        Function<I, T> nextInput) {
    return service.apply(initialInput)
            .thenCompose(response -> {
                accumulator.accept(resultContainer, response);
                if (continueRecursion.test(response)) {
                    return recur(nextInput.apply(response),
                            resultContainer, service, accumulator,
                            continueRecursion, nextInput);
                } else {
                    return CompletableFuture.completedFuture(resultContainer);
                }
            });
}

public CompletableFuture<List<Integer>> getAll() {
    return recur(0, new ArrayList<>(), this::getNext,
            (list, response) -> list.addAll(response.getOffsets()),
            Response::hasMore,
            r -> getLast(r.getOffsets()));
}

A small simplification of recur() is possible by replacing initialInput by the CompletableFuture returned by the result of the first call, the resultContainer and the accumulator can be merged into a single Consumer and the service can then be merged with the nextInput function.

But this gives a little more complex getAll():

private <I> CompletableFuture<Void> recur(CompletableFuture<I> future,
        Consumer<I> accumulator,
        Predicate<I> continueRecursion,
        Function<I, CompletableFuture<I>> service) {
    return future.thenCompose(result -> {
        accumulator.accept(result);
        if (continueRecursion.test(result)) {
            return recur(service.apply(result), accumulator, continueRecursion, service);
        } else {
            return CompletableFuture.completedFuture(null);
        }
    });
}

public CompletableFuture<List<Integer>> getAll() {
    ArrayList<Integer> resultContainer = new ArrayList<>();
    return recur(getNext(0),
            result -> resultContainer.addAll(result.getOffsets()),
            Response::hasMore,
            r -> getNext(getLast(r.getOffsets())))
            .thenApply(unused -> resultContainer);
}

Upvotes: 1

Related Questions