Dennis van der Veeke
Dennis van der Veeke

Reputation: 864

VertX HTTP request parallel processing

I'm using the Java VertX framework and I'm trying to load multiple JSON objects using the VertX WebClient and a simple HTTP request. I want to do this in parallel so it speeds up the process.

I have an Endpoint object:

import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.client.WebClient;
import io.vertx.ext.web.codec.BodyCodec;

public final class Endpoint {

    private final String name;
    private final String url;

    public Endpoint (String name, String url) {
        this.name = name;
        this.url = url;
    }

    public String getName() {
        return name;
    }

    public String getUrl() {
        return url;
    }

    public JsonObject loadJsonObject() {
        WebClient client = WebClient.create(Vertx.vertx());
        client.getAbs(this.getUrl()).as(BodyCodec.jsonObject()).send(handler -> {
            // what to do
        });
        return null;
    }

}

In another class I have the following function that should handle this in parallel (source):

public static void useCompletableFutureWithExecutor(List<Endpoint> tasks) {
    long start = System.nanoTime();
    ExecutorService executor = Executors.newFixedThreadPool(Math.min(tasks.size(), 10));
    List<CompletableFuture<JsonObject>> futures =
        tasks.stream()
             .map(t -> CompletableFuture.supplyAsync(() -> t.loadJsonObject(), executor))
             .collect(Collectors.toList());

    List<JsonObject> result =
        futures.stream()
               .map(CompletableFuture::join)
               .collect(Collectors.toList());
    long duration = (System.nanoTime() - start) / 1_000_000;
    System.out.printf("Processed %d tasks in %d millis\n", tasks.size(), duration);
    System.out.println(result);
    executor.shutdown();
}

I am not sure how to continue from this. The VertX WebClient forces me to use an async handler, meaning I cannot return the JsonObject directly.

Upvotes: 2

Views: 3429

Answers (1)

Alexey Soshin
Alexey Soshin

Reputation: 17741

Your problem is within method signature. Instead of public JsonObject loadJsonObject() { let's start with public Future<JsonObject> loadJsonObject() {

This means that instead of return null; we can return future;, and the future would be defined as Future<JsonObject> future = Future.future();

What's left is to put the data when it arrives:

future.complete(result);

And the final results would look something like:

public Future<JsonObject> loadJsonObject() {
        WebClient client = WebClient.create(Vertx.vertx());
        Future<JsonObject> future = Future.future();
        client.getAbs(this.getUrl()).as(BodyCodec.jsonObject()).send(handler -> {
            // Do something with JSON and put in result
            future.complete(result);

            // Remember to future.fail() in case something went wrong
        });
        return future;
    }

BTW, you also can use CompositeFuture.all() to wait for all your futures.

http://vertx.io/docs/vertx-core/java/#_async_coordination

Upvotes: 1

Related Questions