dale
dale

Reputation: 1258

RX Java Single not returning from Single.merge

I have several api calls (Rx singles) that i want to combine into a single Single. I'm using Single.merge to try combine the result of these calls but when i subscribe to the response i'm getting an empty array as the subscribe has already happened. I call the HealthChecker expecting that the subscribe will return the list of results:

     new HealthChecker(vertx)
        .getHealthChecks(endpoints)
        .subscribe(messages -> {
            log.info("Completed health check {}", messages);
            routingContext.response()
                          .putHeader("content-type", "text/json")
                          .end(messages.toString());
        });

The health checker class performs the logic:

public class HealthChecker {

    private static Logger log = LoggerFactory.getLogger(HealthChecker.class);

    private Vertx vertx;
    private WebClient client;

    public HealthChecker(Vertx vertx) {
        this.vertx = vertx;
        client = WebClient.create(vertx);
    }

    public Single<List<String>> getHealthChecks(JsonArray endpoints) {
        return Single.fromCallable(() -> {

            List<Single<String>> healthChecks = endpoints
                .stream()
                .map(endpoint -> getHealthStatus(client, endpoint.toString()))
                .collect(Collectors.toList());

            return consumeHealthChecks(healthChecks).blockingGet();

        });
    }

    private Single<List<String>> consumeHealthChecks(List<Single<String>> healthChecks) {
        return Single.fromCallable(() -> {
            List<String> messages = new ArrayList<>();

            Single.merge(healthChecks)
                  .timeout(1500, TimeUnit.MILLISECONDS)
                  .subscribe(message -> {
                      log.info("Got health check {}", message);
                      messages.add(message);
                  }, error -> {
                      log.info("Timeout - could not get health check");

                  });

            return messages;
        });
    }

    private Single<String> getHealthStatus(WebClient client, String endpoint) {
        log.info("getting endpoint {}", endpoint);

        return client
            .getAbs(endpoint)
            .rxSend()
            .map(HttpResponse::bodyAsString)
            .map(response -> response);

    }
}

I expect the return value to be a list except all i get is an empty list and then the results come after. Here is the log:

09:12:06.235 [vert.x-eventloop-thread-1] INFO  sys.health.HealthChecker - getting endpoint http://localhost:5000/status
09:12:06.241 [vert.x-eventloop-thread-1] INFO  sys.health.HealthChecker - getting endpoint http://localhost:5001/status
09:12:06.241 [vert.x-eventloop-thread-1] INFO  sys.health.HealthChecker - getting endpoint http://localhost:5002/status
09:12:06.241 [vert.x-eventloop-thread-1] INFO  sys.health.HealthChecker - getting endpoint http://localhost:5003/status
09:12:06.241 [vert.x-eventloop-thread-1] INFO  sys.health.HealthChecker - getting endpoint http://localhost:5004/status
09:12:06.300 [vert.x-eventloop-thread-1] INFO  sys.health.HealthCheckVerticle - Completed health check []
09:12:06.688 [vert.x-eventloop-thread-1] INFO  sys.health.HealthChecker - Got health check {"isHealthy":true}
09:12:06.844 [vert.x-eventloop-thread-1] INFO  sys.health.HealthChecker - Got health check {"isHealthy":true}
09:12:06.898 [vert.x-eventloop-thread-1] INFO  sys.health.HealthChecker - Got health check {"isHealthy":false}
09:12:07.072 [vert.x-eventloop-thread-1] INFO  sys.health.HealthChecker - Got health check {"isHealthy":true}
09:12:07.255 [vert.x-eventloop-thread-1] INFO  sys.health.HealthChecker - Got health check {"isHealthy":true}

Upvotes: 0

Views: 466

Answers (2)

Thomas Cook
Thomas Cook

Reputation: 4863

Your problem is here:

    private Single<List<String>> consumeHealthChecks(List<Single<String>> healthChecks) {
        return Single.fromCallable(() -> {
            List<String> messages = new ArrayList<>();

            Single.merge(healthChecks)
                .timeout(1500, TimeUnit.MILLISECONDS)
                .subscribe(message -> {
                    log.info("Got health check {}", message);
                    messages.add(message);
                }, error -> {
                    log.info("Timeout - could not get health check");
                });

            return messages;
    });
}

You are creating an empty list, then returning it from the lambda, such that the Single returned from consumeHealthChecks is a Single of empty list...

What I presume you want to do is something like this:

    private Single<List<String>> mergeHealthChecks(List<Single<String>> healthChecks) {
      return Single.merge(healthChecks)
                .timeout(1500, TimeUnit.MILLISECONDS);
    }

And then use it like so:

    private void consumeHealthChecks() {
        Single<List<String>> healthChecks = new HealthChecker(vertx)
                .getHealthChecks(endpoints);

        mergeHealthChecks(healthChecks)
                .subscribe(message -> {
                    log.info("Merged and consumed all health checks. Final health check: ", message);
                }, error ->  {
                    log.info("Timeout - could not merge and consume health checks");
                });
    }

Note, when you use Single.merge you will only get the result of the final merged single in the subscribe success callback, so, if you want to log each message as it is successfully consumed, you'll want to hook up a side effecting call to log the message using doOnSuccess before your subscribe call.

Upvotes: 0

akarnokd
akarnokd

Reputation: 70007

Why are you using fromCallable and blockingGet? Also you fire off the merge without actually waiting for it to run to completion, hence the empty list. Instead, compose over the inner Singles:

public Single<List<String>> getHealthChecks(JsonArray endpoints) {
    return Single.defer(() -> {

        List<Single<String>> healthChecks = endpoints
            .stream()
            .map(endpoint -> getHealthStatus(client, endpoint.toString()))
            .collect(Collectors.toList());

        return consumeHealthChecks(healthChecks);
    });
}

private Single<List<String>> consumeHealthChecks(List<Single<String>> healthChecks) {
    return Single.merge(healthChecks)
                 .timeout(1500, TimeUnit.MILLISECONDS)
                 .toList();
}

Upvotes: 1

Related Questions