Orkun
Orkun

Reputation: 7228

Sequential composition for arbitrary number of calls in Vertx with Futures

We use Futures in vertx in examples like:

Future<JsonObject> fetchVehicle = getUserBookedVehicle(routingContext, client);

        fetchVehicle.compose(vehicleJson -> vehicleDoor(routingContext, client, vehicleJson, lock)).setHandler(
                asyncResult -> {
                    if (asyncResult.succeeded()) {
                    LOG.info("Door operation succeeded with result {}", asyncResult.result().encode());
                    handler.handle(Future.succeededFuture(new AsyncReply(200, "OK")));
                }
                else {
                    handler.handle(Future.failedFuture(asyncResult.cause()));
                }
        });

where we handle 2 calls for example.

OR I have another snippet where I can handle any number of methods:

List<Future> futures = new ArrayList<>();
        conversation.getRequestList().forEach(req -> {
            Future<Message<Object>> senderFuture = Future.future();
            vertx.eventBus().send(AbstractOEMClientVerticle.ADDRESS, JsonObject.mapFrom(req), deliveryOptions, senderFuture.completer());

            // sent successfully. save the replyAddress and the conversation for later/callback
            log.info("Saving the conversation for the request.", conversation.getReplyAddress());
            pendingCommands.put(req.getBody().getString(MSG_ID), conversation);

            futures.add(senderFuture);
        });

        CompositeFuture.all(futures).setHandler(ar -> {
            if (ar.succeeded()) {
                handler.handle(Future.succeededFuture());
            } else {
                log.error("forwardToVWClient VW got result : {}", ar.cause());
                handler.handle(Future.failedFuture(ar.cause()));
            }
        });

Here we are chaining all the requests in the conversation.getRequestList() without knowing their count in advance.

But the shortcoming of .all() method is that, we have no control on the order.

How can I chain any number of methods with Vertx Futures (without knowing the exact count of the calls) ?

EDIT:

The official guide talks about sequential composition but the example given has 3 calls. It does not explain how to do it for arbitrary number of calls.

See "Sequential composition" in http://vertx.io/docs/vertx-core/java/

I hope it is clear.

Upvotes: 6

Views: 3079

Answers (4)

0x4b50
0x4b50

Reputation: 679

You can create a single Future, loop over your futures and use compose to chain them:

// single future variable
Future<Void> myFuture = Future.succeededFuture();

// iterate `myCount` times
for (int i = 0; i < myCount; i++) {
  // chain a call to myFuture from above
  myFuture.compose(v -> {
    // do something async
    return someMethodThatReturnsAFuture();
  }).compose(futureResult -> {
    // do something else async
    return someOtherMethodReturningAFuture(futureResult);
  });
}

// all futures are chained and executed after each other
// if one fails, consecutive futures are not executed and myFuture will fail
// if all succeed, myFuture will succeed too 
myFuture.onComplete(...);

Upvotes: 0

Orkun
Orkun

Reputation: 7228

Here is a solution using map & reduce that executes a method in an orderly fashion and returns the accumulated result in the form of a Future<String>

 public static <T> Future<String> chainCall(List<T> list, Function<T, Future<String>> method){
        return list.stream().reduce(Future.succeededFuture(),// the initial "future"
                (acc, item) -> acc.compose(v -> method.apply(item)), // we return the compose of the previous "future" with "future" returned by next item processing
                (a,b) -> Future.future()); // not used! only useful for parallel stream.
    }

can be used as in the example below:

 chainCall(conversation.getRequestList(), this::sendApiRequestViaBus);

where sendApiRequestViaBus is:

/**
     * @param request The request to process
     * @return The result of the request processing. 
     */
    Future<String> sendApiRequestViaBus(ApiRequest request) {
        Future<String> future = Future.future();
        String address = CommandUtilsFactory.getInstance(request.getImplementation()).getApiClientAddress();
        log.debug("Chain call start msgId {}", request.getId());

        vertx.eventBus().send(address, JsonObject.mapFrom(request), deliveryOptions, res -> {
            log.debug("Chain call returns {}", request.getId());
            if (res.succeeded()) {
                future.complete("OK");
            } else {
                future.fail("KO");
            }
        });
        return future;
    }

I hope it helps.

Upvotes: 5

Shaikh Salman
Shaikh Salman

Reputation: 57

Here's something handy. Hope it helps.

public static <R> Future<List<R>> allOfFutures(List<Future<R>> futures) {
    return CompositeFutureImpl.all(futures.toArray(new Future[futures.size()]))
            .map(v -> futures.stream()
                    .map(Future::result)
                    .collect(Collectors.toList())
            );
}

Upvotes: 1

Zhao Shilong
Zhao Shilong

Reputation: 41

If you want to feed the response from the previous request to the next request, and suppose you have different handlers for each response. You can add a helper method

private <T> Future<T> chain(Future<T> init, List<Function<T, Future<T>>> handlers) {
    Future<T> result = init;
    for (Function<T, Future<T>> handler : handlers) {
        result = result.compose(handler);
    }
    return result;
}

And then change your code like this

    Future<JsonObject> fetchVehicle = getUserBookedVehicle(routingContext, client);

    Function<JsonObject, Future<JsonObject>> vehicleResponseHandler = vehicleJson ->
        vehicleDoor(routingContext, client, vehicleJson, lock);

    Function<JsonObject, Future<JsonObject>> anotherTrivialHandler = someJsonObj -> {
        // add here new request by using information from someJsonObj
        LOG.info("Hello from trivial handler {} ", someJsonObj);
        return Future.succeededFuture(someJsonObj);
    };

    List<Function<JsonObject, Future<JsonObject>>> handlers = new ArrayList<>();

    handlers.add(vehicleResponseHandler);
    handlers.add(anotherTrivialHandler);

    chain(fetchVehicle, handlers).setHandler( asyncResult -> {
        if (asyncResult.succeeded()) {
            handler.handle(Future.succeededFuture(new AsyncReply(200, "OK")));
        } else {
            handler.handle(Future.failedFuture(asyncResult.cause()));
        }
    });

But there is a limitation for this implementation which requires each chained Future must have the same type parameter T.

Upvotes: 4

Related Questions