fpiechowski
fpiechowski

Reputation: 517

Combining many ReactiveX streams into one result stream

I am trying to understand ReactiveX using RxJava but I can't get the whole Reactive idea. My case is the following:

I have Task class. It has perform() method which is executing an HTTP request and getting a response through executeRequest() method. The request may be executed many times (defined number of repetitions). I want to grab all the results of executeRequest() and combine them into Flowable data stream so I can return this Flowable in perform() method. So in the end I want my method to return all results of the requests that my Task executed.

executeRequest() returns Single because it executes only one request and may provide only one response or not at all (in case of timeout). In perform() I create Flowable range of numbers for each repetition. Subscribed to this Flowable I execute a request per repetition. I additionally subscribe to each response Single for logging and gathering responses into a collection for later. So now I have a set of Singles, how can I merge them into Flowable to return it in perform()? I tried to mess around with operators like merge() but I don't understand its parameters types.

I've read some guides on the web but they all are very general or don't provide examples according to my case.

public Flowable<HttpClientResponse> perform() {

    Long startTime = System.currentTimeMillis();

    List<HttpClientResponse> responses = new ArrayList<>();
    List<Long> failedRepetitionNumbers = new ArrayList<>();

    Flowable.rangeLong(0, repetitions)
            .subscribe(repetition -> {
                logger.debug("Performing repetition {} of {}", repetition + 1, repetitions);

                Long currentTime = System.currentTimeMillis();

                if (durationCap == 0 || currentTime - startTime < durationCap) {

                    Single<HttpClientResponse> response = executeRequest(method, url, headers, body);

                    response.subscribe(successResult -> {
                                logger.info("Received response with code {} in the {}. repetition.", successResult
                                        .statusCode(), repetition + 1);
                                responses.add(successResult);
                            },
                            error -> {
                                logger.error("Failed to receive response from {}.", url);
                                failedRepetitionNumbers.add(repetition);
                            });
                    waitInterval(minInterval, maxInterval);
                } else {
                    logger.info("Reached duration cap of {}ms for task {}.", durationCap, this);
                }
            });

    return Flowable.merge(???);
}

And executeRequest()

private Single<HttpClientResponse> executeRequest(HttpMethod method, String url, LinkedMultiValueMap<String, String>
        headers, JsonNode body) {

    CompletableFuture<HttpClientResponse> responseFuture = new CompletableFuture<>();

    HttpClient client = vertx.createHttpClient();
    HttpClientRequest request = client.request(method, url, responseFuture::complete);
    headers.forEach(request::putHeader);
    request.write(body.toString());
    request.setTimeout(timeout);
    request.end();

    return Single.fromFuture(responseFuture);
}

Upvotes: 1

Views: 413

Answers (1)

Sarath Kn
Sarath Kn

Reputation: 2745

Instead of subscribing to each observable(each HTTP request) within your perform method, Just keep on chaining the observables like this. Your code can be reduced to something like.

    public Flowable<HttpClientResponse> perform() {
    // Here return a flowable , which can emit n number of times. (where n = your number of HTTP requests)
    return Flowable.rangeLong(0, repetitions) // start a counter
            .doOnNext(repetition -> logger.debug("Performing repetition {} of {}", repetition + 1, repetitions)) // print the current count
            .flatMap(count -> executeRequest(method, url, headers, body).toFlowable()) // get the executeRequest as Flowable
            .timeout(durationCap, TimeUnit.MILLISECONDS); // apply a timeout policy
    }

And finally, you can subscribe to the perform at the place where you actually need to execute all this, As shown below

             perform()
            .subscribeWith(new DisposableSubscriber<HttpClientResponse>() {
                @Override
                public void onNext(HttpClientResponse httpClientResponse) {
                  // onNext will be triggered each time, whenever a request has executed and ready with result
                  // if you had 5 HTTP request, this can trigger 5 times with each "httpClientResponse" (if all calls were success)
                }

                @Override
                public void onError(Throwable t) {
                    // any error during the execution of these request,
                    // including a TimeoutException in case timeout happens in between
                }

                @Override
                public void onComplete() {
                   // will be called finally if no errors happened and onNext delivered all the results
                }
            });

Upvotes: 1

Related Questions