Reputation: 250
I've been stuck with this for a day. Inspired in Dan Lew great post, I tried to make a simple testcase for repeatWhen() and retryWhen():
public class ObsTest {
private static final Logger LOG = LoggerFactory.getLogger(ObsTest.class);
@Test
public void test1() throws InterruptedException {
Observable<Integer> obs = rx.Observable.<Integer> create(observer -> {
LOG.info("onSubscribe");
Integer data = RandomUtils.nextInt(0, 1000);
if (data % 2 != 0) {
observer.onError(new RuntimeException("Odd number " + data));
} else {
observer.onNext(data);
}
observer.onCompleted();
}, BackpressureMode.BUFFER);
obs.repeatWhen(completed -> completed.delay(1, TimeUnit.MILLISECONDS))
.retryWhen(error -> error.delay(1, TimeUnit.MILLISECONDS))
.subscribe(i -> LOG.info("value={}", i), e -> LOG.info("Exception = {}", e.getMessage()));
}
My idea is this should run forever, emitting even numbers as "correct" results, and odd numbers as "errors". Instead, this runs for one or two loops and then stops. And that is when the delay is 1 millisecond, for longer periods of time (ie. 1 second), it runs a single time, emitting just a single odd or even number. I'm sure I'm doing something wrong, but I can't find what it is.
Upvotes: 0
Views: 1038
Reputation: 2545
As Dave Moten mentioned, delay
uses Schedulers.computation()
by default, but you can pass scheduler
of your choice instead - for tests purposes you may use TestScheduler
and "take control over time". Code below shows how can it be used - as you can see this subscription won't terminal for another 30 days, what is basically forever ;)
public class ObsTest {
@Test
public void test1() {
Observable<Integer> obs = rx.Observable.create(observer -> {
Integer data = RandomUtils.nextInt(0, 1000);
if (data % 2 != 0) {
observer.onError(new RuntimeException("Odd number " + data));
} else {
observer.onNext(data);
}
observer.onCompleted();
}, Emitter.BackpressureMode.BUFFER);
TestScheduler scheduler = Schedulers.<Integer>test();
AssertableSubscriber subscriber = obs.repeatWhen(completed -> completed.delay(1, TimeUnit.MILLISECONDS, scheduler))
.retryWhen(error -> error.delay(1, TimeUnit.MILLISECONDS, scheduler))
.subscribeOn(scheduler)
.test();
subscriber.assertNoValues();
scheduler.advanceTimeBy(30, TimeUnit.SECONDS);
subscriber.assertNoTerminalEvent();
scheduler.advanceTimeBy(30, TimeUnit.DAYS);
subscriber.assertNoTerminalEvent();
}
}
Upvotes: 0
Reputation: 12097
When you call delay
which uses Schedulers.computation()
by default you are introducing asynchrony. Once activity starts occurring in a background thread your test will finish and presumably your process is exited. You need to use a blockingSubscribe
or put a longish Thread.sleep
at the end.
Upvotes: 2