Alex
Alex

Reputation: 250

RxJava: Why retryWhen/repeatWhen doesn't work?

I've been stuck with this for a day. Inspired in Dan Lew great post, I tried to make a simple testcase for repeatWhen() and retryWhen():

public class ObsTest {

   private static final Logger LOG = LoggerFactory.getLogger(ObsTest.class);

   @Test
   public void test1() throws InterruptedException {

  Observable<Integer> obs = rx.Observable.<Integer> create(observer -> {
     LOG.info("onSubscribe");
     Integer data = RandomUtils.nextInt(0, 1000);
     if (data % 2 != 0) {
        observer.onError(new RuntimeException("Odd number " + data));
     } else {
        observer.onNext(data);
     }
     observer.onCompleted();
  }, BackpressureMode.BUFFER);

  obs.repeatWhen(completed -> completed.delay(1, TimeUnit.MILLISECONDS))
        .retryWhen(error -> error.delay(1, TimeUnit.MILLISECONDS))
  .subscribe(i -> LOG.info("value={}", i), e -> LOG.info("Exception = {}", e.getMessage()));

}

My idea is this should run forever, emitting even numbers as "correct" results, and odd numbers as "errors". Instead, this runs for one or two loops and then stops. And that is when the delay is 1 millisecond, for longer periods of time (ie. 1 second), it runs a single time, emitting just a single odd or even number. I'm sure I'm doing something wrong, but I can't find what it is.

Upvotes: 0

Views: 1038

Answers (2)

Krzysztof Skrzynecki
Krzysztof Skrzynecki

Reputation: 2545

As Dave Moten mentioned, delay uses Schedulers.computation() by default, but you can pass scheduler of your choice instead - for tests purposes you may use TestScheduler and "take control over time". Code below shows how can it be used - as you can see this subscription won't terminal for another 30 days, what is basically forever ;)

public class ObsTest {

    @Test
    public void test1() {

        Observable<Integer> obs = rx.Observable.create(observer -> {
            Integer data = RandomUtils.nextInt(0, 1000);
            if (data % 2 != 0) {
                observer.onError(new RuntimeException("Odd number " + data));
            } else {
                observer.onNext(data);
            }
            observer.onCompleted();
        }, Emitter.BackpressureMode.BUFFER);

        TestScheduler scheduler = Schedulers.<Integer>test();

        AssertableSubscriber subscriber = obs.repeatWhen(completed -> completed.delay(1,    TimeUnit.MILLISECONDS, scheduler))
            .retryWhen(error -> error.delay(1, TimeUnit.MILLISECONDS, scheduler))
            .subscribeOn(scheduler)
            .test();

        subscriber.assertNoValues();

        scheduler.advanceTimeBy(30, TimeUnit.SECONDS);
        subscriber.assertNoTerminalEvent();

        scheduler.advanceTimeBy(30, TimeUnit.DAYS);
        subscriber.assertNoTerminalEvent();
    }
}

Upvotes: 0

Dave Moten
Dave Moten

Reputation: 12097

When you call delay which uses Schedulers.computation() by default you are introducing asynchrony. Once activity starts occurring in a background thread your test will finish and presumably your process is exited. You need to use a blockingSubscribe or put a longish Thread.sleep at the end.

Upvotes: 2

Related Questions