Reputation: 5289
I'm trying to make the calls to multiple APIs faster.
In the code below, getFilteredEvents
is the currently synchronous version. I have the feeling that the map(x -> x.getFilteredEvents(eventResearch))
operation will wait on the response of each API (it uses RestTemplate.exchange()
internally) before passing onto the next one to build the List<Event>
that I want to return. A solution would probably be to launch the map
call on separate threads, but I wanted to try out the CompletableFuture
API.
Thus, getFilteredEventsFaster
is the result of my efforts to improve the response time.
@Service
public class EventsResearchService {
@Autowired
private List<UniformEventsResearchApi> eventsResearchApis;
// this works, but I'm trying to improve it
public EventResearchResponse getFilteredEvents(EventResearch eventResearch) {
List<Event> eventsList = eventsResearchApis
.stream()
.map(x -> x.getFilteredEvents(eventResearch))
.flatMap(List::stream)
.collect(Collectors.toList());
return extractResponse(eventResearch, eventsList);
}
// this doesn't work yet: what is wrong?
public CompletableFuture<List<Event>> getFilteredEventsFaster(EventResearch eventResearch) {
List<CompletableFuture<List<Event>>> futureEventsList = eventsResearchApis
.parallelStream()
.map(x -> CompletableFuture.supplyAsync(() -> x.getFilteredEvents(eventResearch)))
.collect(Collectors.toList());
return CompletableFuture.allOf(futureEventsList.toArray(new CompletableFuture<List<Event>>[0]));
}
}
My understanding is that I would want to send a CompletableFuture<List<Event>>
back to my frontend, rather than the List<CompletableFuture<List<Event>>>
, hence the CompletableFuture.allOf()
call (which, if I understood properly, resembles a flatmap
operation, creating a CompletableFuture
from multiple CompleteableFuture
s).
Unfortunately, as it is, I get a Generic array creation
compilation error when using new CompletableFuture<List<Event>>[0]
.
What am I doing wrong?
I have the feeling that using the join
method would indeed allow me to collect all the answers, but that would be a blocking operation on the thread of my Service, wouldn't it? (Which would defeat the purpose of trying to return a CompletableFuture
to my frontend, if I understand correctly.)
Upvotes: 2
Views: 9069
Reputation: 285
The following snippet shows the use of listOfFutures.stream().map(CompletableFuture::join)
to collect the result of allOF
. I have taken this example from this page that states that it wont wait for every Future to finish.
class Test {
public static void main(String[] args) throws Exception {
long millisBefore = System.currentTimeMillis();
List<String> strings = Arrays.asList("1","2", "3", "4", "5", "6", "7", "8");
List<CompletableFuture<String>> listOfFutures = strings.stream().map(Test::downloadWebPage).collect(toList());
CompletableFuture<List<String>> futureOfList = CompletableFuture
.allOf(listOfFutures.toArray(new CompletableFuture[0]))
.thenApply(v -> listOfFutures.stream().map(CompletableFuture::join).collect(toList()));
System.out.println(futureOfList.get()); // blocks here
System.out.printf("time taken : %.4fs\n", (System.currentTimeMillis() - millisBefore)/1000d);
}
private static CompletableFuture<String> downloadWebPage(String webPageLink) {
return CompletableFuture.supplyAsync( () ->{
try { TimeUnit.SECONDS.sleep(4); }
catch (Exception io){ throw new RuntimeException(io); }
finally { return "downloaded : "+ webPageLink; }
});
}
}
Since efficiency seems to be the issue here, I have included a dummy benchmarck to prove it does not take 32 seconds to execute.
Output :
[downloaded : 1, downloaded : 2, downloaded : 3, downloaded : 4, downloaded : 5, downloaded : 6, downloaded : 7, downloaded : 8]
time taken : 8.0630s
Thanks to this answer, and through using this website (talks about exception handling related to allOf
), I came up with this completed version:
public CompletableFuture<List<Event>> getFilteredEventsFaster(EventResearch eventResearch) {
/* Collecting the list of all the async requests that build a List<Event>. */
List<CompletableFuture<List<Event>>> completableFutures = eventsResearchApis.stream()
.map(api -> getFilteredEventsAsync(api, eventResearch))
.collect(Collectors.toList());
/* Creating a single Future that contains all the Futures we just created ("flatmap"). */
CompletableFuture<Void> allFutures =CompletableFuture.allOf(completableFutures
.toArray(new CompletableFuture[eventsResearchApis.size()]));
/* When all the Futures have completed, we join them to create merged List<Event>. */
CompletableFuture<List<Event>> allCompletableFutures = allFutures
.thenApply(future -> completableFutures.stream()
.map(CompletableFuture::join)
.flatMap(List::stream) // creating a List<Event> from List<List<Event>>
.collect(Collectors.toList())
);
return allCompletableFutures;
}
private CompletableFuture<List<Event>> getFilteredEventsAsync(UniformEventsResearchApi api,
EventResearch eventResearch) {
/* Manage the Exceptions here to ensure the wrapping Future returns the other calls. */
return CompletableFuture.supplyAsync(() -> api.getFilteredEvents(eventResearch))
.exceptionally(ex -> {
LOGGER.error("Extraction of events from API went wrong: ", ex);
return Collections.emptyList(); // gets managed in the wrapping Future
});
}
Upvotes: 7