Reputation: 744
I would like to implement the async while loop in Java using RxJava.
More specifically, here is my non-async Java code:
for (String dataCenter : dataCenters) {
final Set<Server> serversInDataCenter = getServersInDataCenterSync(dataCenter);
if (!CollectionUtils.isEmpty(serversInDataCenter)) {
final Server available = findOneWithSlots(serversInDataCenter);
if (available != null) {
return available;
}
}
// if no available server found for current dataCenter, try next
}
return null;
What the code above does is that finds an available server from a data center.
Since 90% of the cases, there will be a server available in the first data center that is checked, I don't want to fetch all servers for all data centers in advance.
Now, imagine Set<Server> getServersInDataCenterSync(String dataCenter)
method is changed to an async one and it takes a callback instead: void getServersInDataCenter(String dataCenter, AsyncResultHandler<Set<Server>> handler)
. That also makes another thing
Upvotes: 1
Views: 1477
Reputation: 21476
Observable.fromIterable(dataCenters) // emits data center names
.flatMap(name -> getServersInDataCenter(name), // returns Observable<Server>
maxConcurrency) // see note below
.filter(Server::hasSlotsAvailable) // pass through only available ones
.take(1) // take first one and unsubscribe
In my example getServersInDataCenter()
returns Observable<Server>
. In order to transform callback-style method into observable stream you can use something like the following:
Observable<Server> getServersInDataCenter(String name) {
return Observable.create(emitter ->
getServersInDataCenterAsync(name, event -> {
if (event.isError())
emitter.onError(event.getError());
else {
emitter.onNext(event.getResultSet()); // emit Set<Server>
emitter.onComplete();
}
})
.flatMapIterable(set -> set); // flatten Set into individual items
}
With maxConcurrency
parameter you can limit number of concurrent async requests. If you do not want to make second request until you check all servers from first datacenter, set it to 1. In case you want to speed up finding available server when not many left, increase it. You might want to use delayErrors
parameter as well (see docs here).
Upvotes: 2