mk7.GTI
mk7.GTI

Reputation: 55

Vertx CompositeFuture: on completion of all Futures

In a Vert.x web server, I have a set of Futures, which each one could either fail or succeed and hold a result. I am interested in the outcome (and possibly the result) of each and every one of those Futures, meaning I need to process the result of each Future.

I was thinking that Vert.x's CompositeFuture was the way to go, this is my code snippet:

List<Future> futures = dataProviders.stream()
    .filter(dp -> dp.isActive(requester))
    .map(DataProvider::getData)
    .collect(Collectors.toList());

CompositeFuture.all(futures)
        .onComplete(ar -> {
            if(ar.failed()) {
                routingContext.response()
                    .end(ar.cause());
                return;
            }
            
            CompositeFuture cf = ar.result();
            JsonArray data = new JsonArray();
            for(int i = 0; i < cf.size(); i++) {
                if(cf.failed(i)) {
                    final JsonObject errorJson = new JsonObject();
                    errorJson.put("error", cf.cause(i).getMessage());
                    data.add(errorJson);
                } else {
                    data.add(((Data) cf.resultAt(i)).toJson());
                }
            }

            JsonObject res = new JsonObject()
                .put("data", data);

            routingContext.response()
                    .putHeader("Content-Type", "application/json")
                    .end(res.toString());
        });

but with that I get the following problems:

What is the correct/best way to wait for a list of Futures to complete, while being able to then handle each result and outcome individually?

Upvotes: 5

Views: 6145

Answers (4)

aamiller17
aamiller17

Reputation: 1

The problem with storing the successes into your own data store is that data store would need to be thread safe in order to prevent race conditions. So merely using a Java List or JsonArray for this would not be a robust solution. The easiest and least intrusive way to solve this problem is to create a wrapper Future around your future which is always "successful", but completes with the Throwable cause if it "fails". In this way, you can use the rest of the Vertx library as intended.

Example:

public Future<Object> wrapFuture(Future<Object> future) {
  Promise<Object> promise = Promise.promise();
  future.onComplete(asyncResult -> {
    if (asyncResult.succeeded()) {
      promise.complete(asyncResult.result());
    } else {
      promise.complete(cause);
    }
  });
}

public void myFunction() {
  List<Future> futures = new ArrayList<Future>();
  for (operationReturningAFuture: operations) {
    futures.add(wrapFuture(operationReturningAFuture()));
  }
  ComposititeFuture.join(futures).onComplete(asyncResult -> {
    List<Object> output = asyncResult.result().list();
    for (Object response : output) {
      if (response instanceof Throwable) {
        // this future failed
      } else {
        // this future succeeded
      }
    }
  });
}

Upvotes: 0

user2851729
user2851729

Reputation: 163

I would use join operation instead of all, so CompositeFuture will wait on all futures completion even if one of them fails. Otherwise, it could happen that the evaluation bind to CompositeFuture will happen sooner than all async calls are finished.

Then you could define the recover step before joining futures in CompositeFuture.

  public static void main(String[] args) {
    List<String> inputs = List.of("1", "Terrible error", "2");

    List<Future> futures = inputs.stream()
        .map(i -> asyncAction(i)
            .recover(thr -> {
              // real recovery or just logging
              System.out.println("Bad thing happen: " + thr.getMessage());
              return Future.succeededFuture();
            }))
        .collect(Collectors.toList());

    CompositeFuture.join(futures)
        .map(CompositeFuture::list)
        .map(results -> results.stream()
            // filter out empty recovered future            
            .filter(Objects::nonNull)
            .collect(Collectors.toList()))
        .onSuccess(System.out::println);
  }

  static Future<String> asyncAction(final String input) {
    if ("Terrible error".equals(input)) {
      return Future.failedFuture(input);
    }
    return Future.succeededFuture(input);
  }

It will print:

Bad thing happen: Terrible error
[1, 2]

Upvotes: 4

injecteer
injecteer

Reputation: 20699

You can simply poke the original futures for it's results when using all:

List<Future> futures = //...list of futures

CompositeFuture.all(futures).onComplete(ar -> {
  if(ar.succeeded()){
    futures.forEach(fut -> log.info( fut.succeded() +" / " +_fut.result() ));
  }
} );

Upvotes: 2

taygetos
taygetos

Reputation: 3040

The easiest is if you are handling the results by yourself. You can register a onSuccess handler to your futures. This way the results will be put in some kind of list, e.g. JsonArray.

List<Future> futures = //...list of futures

JsonArray results = new JsonArray();
futures.forEach(e -> e.onSuccess(h -> results.add(h)));

CompositeFuture.all(futures)
    .onComplete(ar -> {
        if(ar.failed()) {
            // successful elements are present in "results"
            routingContext.response().end(results.encode());
            return;
        }
        //... rest of your code
     });

You can also look into the rx-java library. Such use cases are usually better implemented using it.

Upvotes: 5

Related Questions