Ruslan Akhundov
Ruslan Akhundov

Reputation: 2216

Waiting for responses from multiple observables in vertx

I am using vertx-rx-java

I have a handler where I need to make 2 different requests via EventBus and create response using responses from these 2 requests.

public handle(RoutingContext context) {
....some code...    

    Single<Message<Object>> firstRequest = eb.rxSend("address1", "message1");
    Single<Message<Object>> secondRequest = eb.rxSend("address2", "message2");
    ... TODO ...
}

Basically I need to combine two request results and put them into RoutingContext response. The problem is that I don't completely understand how to do that in rxjava style. The only way I was able to think of is smth like that:

firstRequest.doOnSuccess(resp1 -> {
  secondRequest.doOnSuccess(resp2 -> {

  });
});

But I think it's a bad way because what if there are 10 requests instead of 2? this code will have 10 nested calls.

Is there any better ways to combine multiple requests results?

Upvotes: 2

Views: 796

Answers (1)

homerman
homerman

Reputation: 3579

the zip operator can be used to associated emissions from multiple sources, with the distinction that it only emits when each of its underlying sources emits. so...

  • in the case that there are two underlying sources, zip will emit in pairs.
  • in the case that there are three underlying sources, zip will emit in triplets.
  • ...etc

to get a hands-on sense of what i mean, you can refer to the RxMarbles page, and play with the emissions in the top two streams while observing the bottom stream.

with that understanding, you can use the zip operator to combine the results of the Message replies, like this:

Single.zip(firstRequest, secondRequest, (firstReply, secondReply) -> {
    // ...do stuff with the replies and compose some result
    //    to be handled in onSuccess()
    return firstReply.body().toString() + secondReply.body().toString();
})
.subscribe(
    result -> {
        System.out.println("## onSuccess(" + result + ")");
    },
    error -> {
        System.err.println("## onError(" + error.getMessage() + ")");
    }
);

if either delivery fails then the onError handler will be triggered. onSuccess will be triggered otherwise.

if, as you mentioned, you have a large number of requests that you'd like to handle at once, there is an overloaded variant of zip that accepts an Iterable of sources. in your case, that might look something like this:

final List<Single<Message<Object>>> requests = asList(firstRequest, secondRequest, ...);

Single.zip(requests, replies -> {
    // ...do stuff with the array of replies
    return null;
})
.subscribe(...);

hope that helps!

Upvotes: 3

Related Questions