Reputation: 1258
I have several api calls (Rx singles) that i want to combine into a single Single. I'm using Single.merge to try combine the result of these calls but when i subscribe to the response i'm getting an empty array as the subscribe has already happened. I call the HealthChecker expecting that the subscribe will return the list of results:
new HealthChecker(vertx)
.getHealthChecks(endpoints)
.subscribe(messages -> {
log.info("Completed health check {}", messages);
routingContext.response()
.putHeader("content-type", "text/json")
.end(messages.toString());
});
The health checker class performs the logic:
public class HealthChecker {
private static Logger log = LoggerFactory.getLogger(HealthChecker.class);
private Vertx vertx;
private WebClient client;
public HealthChecker(Vertx vertx) {
this.vertx = vertx;
client = WebClient.create(vertx);
}
public Single<List<String>> getHealthChecks(JsonArray endpoints) {
return Single.fromCallable(() -> {
List<Single<String>> healthChecks = endpoints
.stream()
.map(endpoint -> getHealthStatus(client, endpoint.toString()))
.collect(Collectors.toList());
return consumeHealthChecks(healthChecks).blockingGet();
});
}
private Single<List<String>> consumeHealthChecks(List<Single<String>> healthChecks) {
return Single.fromCallable(() -> {
List<String> messages = new ArrayList<>();
Single.merge(healthChecks)
.timeout(1500, TimeUnit.MILLISECONDS)
.subscribe(message -> {
log.info("Got health check {}", message);
messages.add(message);
}, error -> {
log.info("Timeout - could not get health check");
});
return messages;
});
}
private Single<String> getHealthStatus(WebClient client, String endpoint) {
log.info("getting endpoint {}", endpoint);
return client
.getAbs(endpoint)
.rxSend()
.map(HttpResponse::bodyAsString)
.map(response -> response);
}
}
I expect the return value to be a list except all i get is an empty list and then the results come after. Here is the log:
09:12:06.235 [vert.x-eventloop-thread-1] INFO sys.health.HealthChecker - getting endpoint http://localhost:5000/status
09:12:06.241 [vert.x-eventloop-thread-1] INFO sys.health.HealthChecker - getting endpoint http://localhost:5001/status
09:12:06.241 [vert.x-eventloop-thread-1] INFO sys.health.HealthChecker - getting endpoint http://localhost:5002/status
09:12:06.241 [vert.x-eventloop-thread-1] INFO sys.health.HealthChecker - getting endpoint http://localhost:5003/status
09:12:06.241 [vert.x-eventloop-thread-1] INFO sys.health.HealthChecker - getting endpoint http://localhost:5004/status
09:12:06.300 [vert.x-eventloop-thread-1] INFO sys.health.HealthCheckVerticle - Completed health check []
09:12:06.688 [vert.x-eventloop-thread-1] INFO sys.health.HealthChecker - Got health check {"isHealthy":true}
09:12:06.844 [vert.x-eventloop-thread-1] INFO sys.health.HealthChecker - Got health check {"isHealthy":true}
09:12:06.898 [vert.x-eventloop-thread-1] INFO sys.health.HealthChecker - Got health check {"isHealthy":false}
09:12:07.072 [vert.x-eventloop-thread-1] INFO sys.health.HealthChecker - Got health check {"isHealthy":true}
09:12:07.255 [vert.x-eventloop-thread-1] INFO sys.health.HealthChecker - Got health check {"isHealthy":true}
Upvotes: 0
Views: 466
Reputation: 4863
Your problem is here:
private Single<List<String>> consumeHealthChecks(List<Single<String>> healthChecks) {
return Single.fromCallable(() -> {
List<String> messages = new ArrayList<>();
Single.merge(healthChecks)
.timeout(1500, TimeUnit.MILLISECONDS)
.subscribe(message -> {
log.info("Got health check {}", message);
messages.add(message);
}, error -> {
log.info("Timeout - could not get health check");
});
return messages;
});
}
You are creating an empty list, then returning it from the lambda, such that the Single returned from consumeHealthChecks
is a Single of empty list...
What I presume you want to do is something like this:
private Single<List<String>> mergeHealthChecks(List<Single<String>> healthChecks) {
return Single.merge(healthChecks)
.timeout(1500, TimeUnit.MILLISECONDS);
}
And then use it like so:
private void consumeHealthChecks() {
Single<List<String>> healthChecks = new HealthChecker(vertx)
.getHealthChecks(endpoints);
mergeHealthChecks(healthChecks)
.subscribe(message -> {
log.info("Merged and consumed all health checks. Final health check: ", message);
}, error -> {
log.info("Timeout - could not merge and consume health checks");
});
}
Note, when you use Single.merge
you will only get the result of the final merged single in the subscribe success callback, so, if you want to log each message as it is successfully consumed, you'll want to hook up a side effecting call to log the message using doOnSuccess
before your subscribe call.
Upvotes: 0
Reputation: 70007
Why are you using fromCallable
and blockingGet
? Also you fire off the merge
without actually waiting for it to run to completion, hence the empty list. Instead, compose over the inner Single
s:
public Single<List<String>> getHealthChecks(JsonArray endpoints) {
return Single.defer(() -> {
List<Single<String>> healthChecks = endpoints
.stream()
.map(endpoint -> getHealthStatus(client, endpoint.toString()))
.collect(Collectors.toList());
return consumeHealthChecks(healthChecks);
});
}
private Single<List<String>> consumeHealthChecks(List<Single<String>> healthChecks) {
return Single.merge(healthChecks)
.timeout(1500, TimeUnit.MILLISECONDS)
.toList();
}
Upvotes: 1