Michel Krämer
Michel Krämer

Reputation: 14647

Loop until condition becomes true in RxJava

I want my code to repeat a certain asynchronous operation until this operation is successful (i.e. until it returns true).

At the moment I'm using the following workaround:

Supplier<Observable<Boolean>> myOperation = () -> {
  // do something useful and return 'true' if it was successful
  // NOTE: GENERATING A RANDOM NUMBER IS JUST AN EXAMPLE HERE
  // I WANT TO RUN AN ASYNCHRONOUS OPERATION (LIKE PINGING A SERVER
  // OR THE LIKE) AND RETRY IT UNTIL IT SUCCEEDS.
  System.out.println("Try");
  return Observable.just(Math.random() > 0.9);
};

final Throwable retry = new IllegalStateException();

Observable.<Boolean>create(subscriber -> {
  myOperation.get().subscribe(subscriber);
}).flatMap(b -> b ? Observable.just(b) : Observable.error(retry))
  .retryWhen(exceptions -> exceptions.flatMap(exception -> {
    if (exception == retry) {
      return Observable.timer(1, TimeUnit.SECONDS);
    }
    return Observable.error(exception);
  }))
  .toBlocking()
  .forEach(b -> {
    System.out.println("Connected.");
  });

It works well and prints out something like this:

Try
Try
...
Try
Connected.

The code does what I want, but it doesn't look very elegant. I'm sure there must be a better way. Maybe by using a custom Operator?

Does anybody know how to achieve the same thing in RxJava but in a more readable manner and without the artificial Throwable?

Upvotes: 4

Views: 10536

Answers (1)

Tassos Bassoukos
Tassos Bassoukos

Reputation: 16142

Not enough time, so this is going to be by memory...

public class Randomizer implements Iterable<Double>, Iterator<Double> {
  public Iterator<Double> getIterator() {return this;}
  public boolean hasNext() {return true;}
  public Double next() {return Math.random();}
}

...

Observable.from(new Randomizer())
          .takeWhile(value -> value < 0.99);
// or takeUntil(value -> value > 0.99); can't remember their differences.

OTOH if you need to do something more complex, look into Observable.defer() and / or a BehaviorSubject.

Edit: Now there's a bit more time to read your post, you could try something like this:

Observable.defer(() -> createConnectionObservable())
          .retry((count, err) -> {
              if(count>9) return false;
              if(!(err instanceof IOException)) return false;
              return true;
          })

Keep in mind that if you use Retrofit you shouldn't need defer(), as retrofit will re-initiate the call when a new subscription happens.

Upvotes: 2

Related Questions