Ali Ok
Ali Ok

Reputation: 744

RxJava async while loop

I would like to implement the async while loop in Java using RxJava.

More specifically, here is my non-async Java code:

    for (String dataCenter : dataCenters) {
        final Set<Server> serversInDataCenter = getServersInDataCenterSync(dataCenter);
        if (!CollectionUtils.isEmpty(serversInDataCenter)) {
            final Server available = findOneWithSlots(serversInDataCenter);
            if (available != null) {
                return available;
            }
        }
        // if no available server found for current dataCenter, try next
    }
    return null;

What the code above does is that finds an available server from a data center.

Since 90% of the cases, there will be a server available in the first data center that is checked, I don't want to fetch all servers for all data centers in advance.

Now, imagine Set<Server> getServersInDataCenterSync(String dataCenter) method is changed to an async one and it takes a callback instead: void getServersInDataCenter(String dataCenter, AsyncResultHandler<Set<Server>> handler). That also makes another thing

Upvotes: 1

Views: 1477

Answers (1)

Yaroslav Stavnichiy
Yaroslav Stavnichiy

Reputation: 21476

Observable.fromIterable(dataCenters) // emits data center names
    .flatMap(name -> getServersInDataCenter(name), // returns Observable<Server>
        maxConcurrency) // see note below
    .filter(Server::hasSlotsAvailable) // pass through only available ones
    .take(1) // take first one and unsubscribe

In my example getServersInDataCenter() returns Observable<Server>. In order to transform callback-style method into observable stream you can use something like the following:

Observable<Server> getServersInDataCenter(String name) {
    return Observable.create(emitter ->
        getServersInDataCenterAsync(name, event -> {
            if (event.isError()) 
                emitter.onError(event.getError());
            else {
                emitter.onNext(event.getResultSet()); // emit Set<Server>
                emitter.onComplete();
            }
        })
        .flatMapIterable(set -> set); // flatten Set into individual items
}

With maxConcurrency parameter you can limit number of concurrent async requests. If you do not want to make second request until you check all servers from first datacenter, set it to 1. In case you want to speed up finding available server when not many left, increase it. You might want to use delayErrors parameter as well (see docs here).

Upvotes: 2

Related Questions