Reputation: 3531
I have an async API that essentially returns results through pagination
public CompletableFuture<Response> getNext(int startFrom);
Each Response
object contains a list of offsets from startFrom
and a flag indicating whether there are more elements remaining and, therefore, another getNext()
request to make.
I'd like to write a method that goes through all the pages and retrieves all the offsets. I can write it in a synchronous manner like so
int startFrom = 0;
List<Integer> offsets = new ArrayList<>();
for (;;) {
CompletableFuture<Response> future = getNext(startFrom);
Response response = future.get(); // an exception stops everything
if (response.getOffsets().isEmpty()) {
break; // we're done
}
offsets.addAll(response.getOffsets());
if (!response.hasMore()) {
break; // we're done
}
startFrom = getLast(response.getOffsets());
}
In other words, we call getNext()
with startFrom
at 0. If an exception is thrown, we short-circuit the entire process. Otherwise, if there are no offsets, we complete. If there are offsets, we add them to the master list. If there are no more left to fetch, we complete. Otherwise, we reset the startFrom
to the last offset we fetched and repeat.
Ideally, I want to do this without blocking with CompletableFuture::get()
and returning a CompletableFuture<List<Integer>>
containing all the offsets.
How can I do this? How can I compose the futures to collect their results?
I'm thinking of a "recursive" (not actually in execution, but in code)
private CompletableFuture<List<Integer>> recur(int startFrom, List<Integer> offsets) {
CompletableFuture<Response> future = getNext(startFrom);
return future.thenCompose((response) -> {
if (response.getOffsets().isEmpty()) {
return CompletableFuture.completedFuture(offsets);
}
offsets.addAll(response.getOffsets());
if (!response.hasMore()) {
return CompletableFuture.completedFuture(offsets);
}
return recur(getLast(response.getOffsets()), offsets);
});
}
public CompletableFuture<List<Integer>> getAll() {
List<Integer> offsets = new ArrayList<>();
return recur(0, offsets);
}
I don't love this, from a complexity point of view. Can we do better?
Upvotes: 6
Views: 2003
Reputation: 20579
I also wanted to give a shot at EA Async on this one, as it implements Java support for async/await (inspired from C#). So I just took your initial code, and converted it:
public CompletableFuture<List<Integer>> getAllEaAsync() {
int startFrom = 0;
List<Integer> offsets = new ArrayList<>();
for (;;) {
// this is the only thing I changed!
Response response = Async.await(getNext(startFrom));
if (response.getOffsets().isEmpty()) {
break; // we're done
}
offsets.addAll(response.getOffsets());
if (!response.hasMore()) {
break; // we're done
}
startFrom = getLast(response.getOffsets());
}
// well, you also have to wrap your result in a future to make it compilable
return CompletableFuture.completedFuture(offsets);
}
You then have to instrument your code, for example by adding
Async.init();
at the beginning of your main()
method.
I must say: this really looks like magic!
Behind the scenes, EA Async notices there is an Async.await()
call within the method, and rewrites it to handle all the thenCompose()
/thenApply()
/recursion for you. The only requirement is that your method must return a CompletionStage
or CompletableFuture
.
That's really async code made easy!
Upvotes: 2
Reputation: 20579
For the exercise, I made a generic version of this algorithm, but it is rather complex because you need:
startFrom
)getNext()
)offsets
)offsets.addAll(response.getOffsets())
)response.hasMore()
)getLast(response.getOffsets())
)so this gives:
public <T, I, R> CompletableFuture<R> recur(T initialInput, R resultContainer,
Function<T, CompletableFuture<I>> service,
BiConsumer<R, I> accumulator,
Predicate<I> continueRecursion,
Function<I, T> nextInput) {
return service.apply(initialInput)
.thenCompose(response -> {
accumulator.accept(resultContainer, response);
if (continueRecursion.test(response)) {
return recur(nextInput.apply(response),
resultContainer, service, accumulator,
continueRecursion, nextInput);
} else {
return CompletableFuture.completedFuture(resultContainer);
}
});
}
public CompletableFuture<List<Integer>> getAll() {
return recur(0, new ArrayList<>(), this::getNext,
(list, response) -> list.addAll(response.getOffsets()),
Response::hasMore,
r -> getLast(r.getOffsets()));
}
A small simplification of recur()
is possible by replacing initialInput
by the CompletableFuture
returned by the result of the first call, the resultContainer
and the accumulator
can be merged into a single Consumer
and the service
can then be merged with the nextInput
function.
But this gives a little more complex getAll()
:
private <I> CompletableFuture<Void> recur(CompletableFuture<I> future,
Consumer<I> accumulator,
Predicate<I> continueRecursion,
Function<I, CompletableFuture<I>> service) {
return future.thenCompose(result -> {
accumulator.accept(result);
if (continueRecursion.test(result)) {
return recur(service.apply(result), accumulator, continueRecursion, service);
} else {
return CompletableFuture.completedFuture(null);
}
});
}
public CompletableFuture<List<Integer>> getAll() {
ArrayList<Integer> resultContainer = new ArrayList<>();
return recur(getNext(0),
result -> resultContainer.addAll(result.getOffsets()),
Response::hasMore,
r -> getNext(getLast(r.getOffsets())))
.thenApply(unused -> resultContainer);
}
Upvotes: 1