halileohalilei
halileohalilei

Reputation: 2280

Sending queries in parallel with ExecutorService in Java 8

I'm writing a web application in Java (namely using JavaLite). In this web application, I have an endpoint which should send a bunch of other requests to a server, when invoked. Since these requests may increase in number, I decided to send these requests in parallel, using the Java Concurrency API introduced in Java 8. My code for sending multiple requests in parallel is as follows:

public List<String> searchAll(List<String> keywords) {
    ExecutorService executor = Executors.newWorkStealingPool();
    List<Callable<List<String>>> tasks = new ArrayList<>();
    for (String key : keywords) {
        tasks.add(() -> {
            LOGGER.info("Sending query for key: " + key);
            return sendSearchQuery(key);
        });
    }
    List<String> all = new ArrayList<>();
    try {
        executor.invokeAll(tasks)
                .stream()
                .map(future -> {
                    try {
                        return future.get();
                    }
                    catch (Exception e) {
                        throw new IllegalStateException(e);
                    }
                })
                .forEach((list) ->
                {
                    LOGGER.info("Received list: " + list);
                    all.addAll(list);
                });
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return all;
}

private List<String> sendSearchQuery(String query) throws UnirestException {
    long startTime = System.nanoTime();
    HttpResponse<JsonNode> response = Unirest.get(SEARCH_URL)
            .queryString("q", query).asString();
    Map<String, Object> result = JsonHelper.toMap(response.getBody());
//    Get get = Http.get(SEARCH_URL + "?q=" + query);
//    Map<String, Object> result = JsonHelper.toMap(get.text());
    LOGGER.info("Query received in " + (System.nanoTime() - startTime) / 1000000 + " ms for key: " + query);
    return (List<String>) result.get("result");
}

And the output for this piece of code is as follows:

[ForkJoinPool-2-worker-1] INFO app.managers.SearchManager - Sending query for key: sky
[ForkJoinPool-2-worker-2] INFO app.managers.SearchManager - Sending query for key: outdoor
[ForkJoinPool-2-worker-3] INFO app.managers.SearchManager - Sending query for key: bridge
[ForkJoinPool-2-worker-0] INFO app.managers.SearchManager - Sending query for key: water
[ForkJoinPool-2-worker-0] INFO app.managers.SearchManager - Query received in 1331 ms for key: water
[ForkJoinPool-2-worker-0] INFO app.managers.SearchManager - Sending query for key: building
[ForkJoinPool-2-worker-1] INFO app.managers.SearchManager - Query received in 1332 ms for key: sky
[ForkJoinPool-2-worker-2] INFO app.managers.SearchManager - Query received in 1332 ms for key: outdoor
[ForkJoinPool-2-worker-3] INFO app.managers.SearchManager - Query received in 1332 ms for key: bridge
[ForkJoinPool-2-worker-0] INFO app.managers.SearchManager - Query received in 302 ms for key: building
[1324676647@qtp-178658894-0] INFO app.managers.SearchManager - Received list: [16973, 4564, 12392, 1195, 1207, 682, 10518, 10532, 10545, 19328, 10524, 10537, 10551, 19334, 10522, 10535, 10548, 19332, 10521, 10534]
[1324676647@qtp-178658894-0] INFO app.managers.SearchManager - Received list: []
[1324676647@qtp-178658894-0] INFO app.managers.SearchManager - Received list: [4303, 2844, 4366]
[1324676647@qtp-178658894-0] INFO app.managers.SearchManager - Received list: [9490, 1638, 20006, 17715, 17758, 18788, 6071, 11230, 13384, 4940, 18039, 17871, 16629, 6148, 19172, 4263, 4569, 8396, 18643, 4904]
[1324676647@qtp-178658894-0] INFO app.managers.SearchManager - Received list: [17306, 17303, 17305, 17304, 16062, 16156, 16153, 16154, 16061, 9098, 2491, 4368, 22134, 1008, 16152, 16151, 16148, 16155, 16147, 16149]

As you can see, I used two different Http libraries (JavaLite Http and Unirest) to see if the problem was with the library I was using, however that doesn't seem to be the case as they both yield the same problem.

The problem here is, the first n (number of processor on the machine) queries start and end at the same time. This is normal, however they also take longer than they should. Let's say a single request takes t time in normal conditions. In this case, the first n queries each take around n * t time, and the remaining queries each take t time. Am I using the concurrency API wrongly?

Edit: The server running on SEARCH_URL is deployed on Azure and it can handle multiple requests.

I also tried the following:

Edit 2: So I played around with both the server and the application I'm currently working on. The weird thing is, the server responds to n requests in different times, however the app receives these responses after a time frame which starts with the time first request reaches the server and ends with the time n'th response reaches the app. I have no explanation for this behavior.

Upvotes: 1

Views: 2358

Answers (2)

Ash
Ash

Reputation: 2602

Have you looked at the completableFuture framework that was brought in for java 8? I could help with you trying to send out everything async.

List<CompletableFuture<List<String>>> futures = keywords.parallelStream()
            .map(key -> CompletableFuture.supplyAsync(() -> sendSearchQuery(key), executor))
            .collect(toList());

CompletableFuture<Void> allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));

try {
    allOf.join();
} catch (Exception e){
    e.printStackTrace();
}

List<String> all = futures.stream().filter(CompletableFuture::isCompletedExceptionally)
            .flatMap(future -> future.join().stream())
            .collect(toList());

return all;

What this will do will send out all you searchs async, then you call allOf.join(), you wait on everything to return.

The final stream then maps every result back to one list and returns

Upvotes: 1

Ramandeep Nanda
Ramandeep Nanda

Reputation: 519

invokeAll(Collection<? extends Callable<T>> tasks)
  • Executes the given tasks, returning a list of Futures holding their status and results when all complete.

Does your source also have capability to serve multiple requests?

Upvotes: 0

Related Questions