Reputation: 517
I am trying to understand ReactiveX using RxJava but I can't get the whole Reactive idea. My case is the following:
I have Task
class. It has perform()
method which is executing an HTTP request and getting a response through executeRequest()
method. The request may be executed many times (defined number of repetitions). I want to grab all the results of executeRequest()
and combine them into Flowable
data stream so I can return this Flowable
in perform()
method. So in the end I want my method to return all results of the requests that my Task
executed.
executeRequest()
returns Single
because it executes only one request and may provide only one response or not at all (in case of timeout).
In perform()
I create Flowable
range of numbers for each repetition. Subscribed to this Flowable
I execute a request per repetition. I additionally subscribe to each response Single
for logging and gathering responses into a collection for later. So now I have a set of Single
s, how can I merge them into Flowable
to return it in perform()
? I tried to mess around with operators like merge()
but I don't understand its parameters types.
I've read some guides on the web but they all are very general or don't provide examples according to my case.
public Flowable<HttpClientResponse> perform() {
Long startTime = System.currentTimeMillis();
List<HttpClientResponse> responses = new ArrayList<>();
List<Long> failedRepetitionNumbers = new ArrayList<>();
Flowable.rangeLong(0, repetitions)
.subscribe(repetition -> {
logger.debug("Performing repetition {} of {}", repetition + 1, repetitions);
Long currentTime = System.currentTimeMillis();
if (durationCap == 0 || currentTime - startTime < durationCap) {
Single<HttpClientResponse> response = executeRequest(method, url, headers, body);
response.subscribe(successResult -> {
logger.info("Received response with code {} in the {}. repetition.", successResult
.statusCode(), repetition + 1);
responses.add(successResult);
},
error -> {
logger.error("Failed to receive response from {}.", url);
failedRepetitionNumbers.add(repetition);
});
waitInterval(minInterval, maxInterval);
} else {
logger.info("Reached duration cap of {}ms for task {}.", durationCap, this);
}
});
return Flowable.merge(???);
}
And executeRequest()
private Single<HttpClientResponse> executeRequest(HttpMethod method, String url, LinkedMultiValueMap<String, String>
headers, JsonNode body) {
CompletableFuture<HttpClientResponse> responseFuture = new CompletableFuture<>();
HttpClient client = vertx.createHttpClient();
HttpClientRequest request = client.request(method, url, responseFuture::complete);
headers.forEach(request::putHeader);
request.write(body.toString());
request.setTimeout(timeout);
request.end();
return Single.fromFuture(responseFuture);
}
Upvotes: 1
Views: 413
Reputation: 2745
Instead of subscribing to each observable(each HTTP request) within your perform
method, Just keep on chaining the observables like this. Your code can be reduced to something like.
public Flowable<HttpClientResponse> perform() {
// Here return a flowable , which can emit n number of times. (where n = your number of HTTP requests)
return Flowable.rangeLong(0, repetitions) // start a counter
.doOnNext(repetition -> logger.debug("Performing repetition {} of {}", repetition + 1, repetitions)) // print the current count
.flatMap(count -> executeRequest(method, url, headers, body).toFlowable()) // get the executeRequest as Flowable
.timeout(durationCap, TimeUnit.MILLISECONDS); // apply a timeout policy
}
And finally, you can subscribe to the perform
at the place where you actually need to execute all this, As shown below
perform()
.subscribeWith(new DisposableSubscriber<HttpClientResponse>() {
@Override
public void onNext(HttpClientResponse httpClientResponse) {
// onNext will be triggered each time, whenever a request has executed and ready with result
// if you had 5 HTTP request, this can trigger 5 times with each "httpClientResponse" (if all calls were success)
}
@Override
public void onError(Throwable t) {
// any error during the execution of these request,
// including a TimeoutException in case timeout happens in between
}
@Override
public void onComplete() {
// will be called finally if no errors happened and onNext delivered all the results
}
});
Upvotes: 1