Reputation: 55
In a Vert.x web server, I have a set of Futures, which each one could either fail or succeed and hold a result. I am interested in the outcome (and possibly the result) of each and every one of those Futures, meaning I need to process the result of each Future.
I was thinking that Vert.x's CompositeFuture
was the way to go, this is my code snippet:
List<Future> futures = dataProviders.stream()
.filter(dp -> dp.isActive(requester))
.map(DataProvider::getData)
.collect(Collectors.toList());
CompositeFuture.all(futures)
.onComplete(ar -> {
if(ar.failed()) {
routingContext.response()
.end(ar.cause());
return;
}
CompositeFuture cf = ar.result();
JsonArray data = new JsonArray();
for(int i = 0; i < cf.size(); i++) {
if(cf.failed(i)) {
final JsonObject errorJson = new JsonObject();
errorJson.put("error", cf.cause(i).getMessage());
data.add(errorJson);
} else {
data.add(((Data) cf.resultAt(i)).toJson());
}
}
JsonObject res = new JsonObject()
.put("data", data);
routingContext.response()
.putHeader("Content-Type", "application/json")
.end(res.toString());
});
but with that I get the following problems:
CompositeFuture.all(futures).onComplete()
, I don't get the results of a succeeded Future as soon as any Future out of futures
fails (because then ar.result()
is null).CompositeFuture.any(futures).onComplete()
, I would get all results, but the CompositeFuture completes before all Futures of futures
are completed. Meaning, it does not wait for every Future to complete, but completes as soon as any Future is completed. (-> cf.resultAt(i)
returns null)CompositeFuture.join(futures).onComplete()
, it's the same as with all()
: ar.result()
is null as soon as any Future fails.What is the correct/best way to wait for a list of Futures to complete, while being able to then handle each result and outcome individually?
Upvotes: 5
Views: 6145
Reputation: 1
The problem with storing the successes into your own data store is that data store would need to be thread safe in order to prevent race conditions. So merely using a Java List or JsonArray for this would not be a robust solution. The easiest and least intrusive way to solve this problem is to create a wrapper Future around your future which is always "successful", but completes with the Throwable cause if it "fails". In this way, you can use the rest of the Vertx library as intended.
Example:
public Future<Object> wrapFuture(Future<Object> future) {
Promise<Object> promise = Promise.promise();
future.onComplete(asyncResult -> {
if (asyncResult.succeeded()) {
promise.complete(asyncResult.result());
} else {
promise.complete(cause);
}
});
}
public void myFunction() {
List<Future> futures = new ArrayList<Future>();
for (operationReturningAFuture: operations) {
futures.add(wrapFuture(operationReturningAFuture()));
}
ComposititeFuture.join(futures).onComplete(asyncResult -> {
List<Object> output = asyncResult.result().list();
for (Object response : output) {
if (response instanceof Throwable) {
// this future failed
} else {
// this future succeeded
}
}
});
}
Upvotes: 0
Reputation: 163
I would use join operation instead of all, so CompositeFuture will wait on all futures completion even if one of them fails. Otherwise, it could happen that the evaluation bind to CompositeFuture will happen sooner than all async calls are finished.
Then you could define the recover step before joining futures in CompositeFuture.
public static void main(String[] args) {
List<String> inputs = List.of("1", "Terrible error", "2");
List<Future> futures = inputs.stream()
.map(i -> asyncAction(i)
.recover(thr -> {
// real recovery or just logging
System.out.println("Bad thing happen: " + thr.getMessage());
return Future.succeededFuture();
}))
.collect(Collectors.toList());
CompositeFuture.join(futures)
.map(CompositeFuture::list)
.map(results -> results.stream()
// filter out empty recovered future
.filter(Objects::nonNull)
.collect(Collectors.toList()))
.onSuccess(System.out::println);
}
static Future<String> asyncAction(final String input) {
if ("Terrible error".equals(input)) {
return Future.failedFuture(input);
}
return Future.succeededFuture(input);
}
It will print:
Bad thing happen: Terrible error
[1, 2]
Upvotes: 4
Reputation: 20699
You can simply poke the original futures for it's results when using all
:
List<Future> futures = //...list of futures
CompositeFuture.all(futures).onComplete(ar -> {
if(ar.succeeded()){
futures.forEach(fut -> log.info( fut.succeded() +" / " +_fut.result() ));
}
} );
Upvotes: 2
Reputation: 3040
The easiest is if you are handling the results by yourself. You can register a onSuccess
handler to your futures. This way the results will be put in some kind of list, e.g. JsonArray
.
List<Future> futures = //...list of futures
JsonArray results = new JsonArray();
futures.forEach(e -> e.onSuccess(h -> results.add(h)));
CompositeFuture.all(futures)
.onComplete(ar -> {
if(ar.failed()) {
// successful elements are present in "results"
routingContext.response().end(results.encode());
return;
}
//... rest of your code
});
You can also look into the rx-java
library. Such use cases are usually better implemented using it.
Upvotes: 5