Reputation: 2216
I am using vertx-rx-java
I have a handler where I need to make 2 different requests via EventBus
and create response using responses from these 2 requests.
public handle(RoutingContext context) {
....some code...
Single<Message<Object>> firstRequest = eb.rxSend("address1", "message1");
Single<Message<Object>> secondRequest = eb.rxSend("address2", "message2");
... TODO ...
}
Basically I need to combine two request results and put them into RoutingContext
response. The problem is that I don't completely understand how to do that in rxjava style.
The only way I was able to think of is smth like that:
firstRequest.doOnSuccess(resp1 -> {
secondRequest.doOnSuccess(resp2 -> {
});
});
But I think it's a bad way because what if there are 10 requests instead of 2? this code will have 10 nested calls.
Is there any better ways to combine multiple requests results?
Upvotes: 2
Views: 796
Reputation: 3579
the zip
operator can be used to associated emissions from multiple sources, with the distinction that it only emits when each of its underlying sources emits. so...
zip
will emit in pairs. zip
will emit in triplets.to get a hands-on sense of what i mean, you can refer to the RxMarbles page, and play with the emissions in the top two streams while observing the bottom stream.
with that understanding, you can use the zip operator to combine the results of the Message
replies, like this:
Single.zip(firstRequest, secondRequest, (firstReply, secondReply) -> {
// ...do stuff with the replies and compose some result
// to be handled in onSuccess()
return firstReply.body().toString() + secondReply.body().toString();
})
.subscribe(
result -> {
System.out.println("## onSuccess(" + result + ")");
},
error -> {
System.err.println("## onError(" + error.getMessage() + ")");
}
);
if either delivery fails then the onError
handler will be triggered. onSuccess
will be triggered otherwise.
if, as you mentioned, you have a large number of requests that you'd like to handle at once, there is an overloaded variant of zip that accepts an Iterable
of sources. in your case, that might look something like this:
final List<Single<Message<Object>>> requests = asList(firstRequest, secondRequest, ...);
Single.zip(requests, replies -> {
// ...do stuff with the array of replies
return null;
})
.subscribe(...);
hope that helps!
Upvotes: 3