Reputation: 2832
So guys newish to RX Java I have a question.
In my expedition of learning the beast that is RXJava this is my class under test.
public class PollingLoop {
public static <T> Observable<T> buildObservable(
final int interval,
final TimeUnit timeUnit,
final int maxJitter,
final Scheduler scheduler,
final Supplier<Observable<T>> scheduledTask) {
if (maxJitter <= 0) throw new IllegalArgumentException("Jitter must be greater than 0");
final Random randomJitter = new Random();
return Observable.timer(interval, timeUnit, scheduler)
.map(x -> {
System.out.println("Flat map jitter");
return randomJitter.nextInt(maxJitter);
})
.flatMap(jitter -> {
System.out.println("Flat map timer");
return Observable.timer(jitter, timeUnit, scheduler);
})
.flatMap(ignored -> {
System.out.println("Flat map task");
return scheduledTask.get();
})
.retry()
.repeat();
}
public static <T> Completable buildCompletable(
final int interval,
final TimeUnit timeUnit,
final int maxJitter,
final Scheduler scheduler,
final Supplier<Completable> scheduledTask) {
if (maxJitter <= 0) throw new IllegalArgumentException("Jitter must be greater than 0");
final Random randomJitter = new Random();
return Observable.timer(interval, timeUnit, scheduler)
.map(x -> {
System.out.println("Flat map jitter");
return randomJitter.nextInt(maxJitter);
})
.flatMapCompletable(jitter -> {
System.out.println("Flat map timer");
return Completable.timer(jitter, timeUnit, scheduler);
})
.flatMapCompletable(ignored -> {
System.out.println("Flat map task that is not called");
return scheduledTask.get();
})
.retry()
.repeat()
.toCompletable();
}
}
From a test when i test the delay of execution of an Observable I get the output
Flat map jitter
Flat map timer
Flat map task //(observable is being called)
But when i test the delay of execution of an Completable I get the output
Flat map jitter
Flat map timer
//(The completable task is not being called)
What am i doing wrong? Why is the Completable task not being called from within buildCompletable?
Here are the test (written in spock)
def 'should delay execution of observable'() {
given:
def subscriber = new TestSubscriber<>()
def scheduler = new TestScheduler()
def supplier = Mock Supplier
supplier.get() >> Observable.just(true)
when:
PollingLoop.buildObservable(100, TimeUnit.MILLISECONDS, 1, scheduler, supplier).subscribe(subscriber)
scheduler.advanceTimeBy(101, TimeUnit.MILLISECONDS)
then:
subscriber.assertValueCount(1)
subscriber.assertValue(true)
}
def 'should delay execution of completable'(){
given:
def subscriber = new TestSubscriber<>()
def scheduler = new TestScheduler()
def supplier = Mock Supplier
supplier.get() >> Completable.complete()
when:
PollingLoop.buildCompletable(100, TimeUnit.MILLISECONDS, 1, scheduler, supplier).subscribe(subscriber)
scheduler.advanceTimeBy(1001, TimeUnit.MILLISECONDS)
enter code here
then:
1 * supplier.get()
}
Upvotes: 1
Views: 652
Reputation: 8227
The result of your first flatMapCompletable()
is a completable, as that is what you are returning. However, that completable will never emit a value (by definition) so there is no value for the subsequent flatMapCompletable()
to map.
Since your first Completable
doesn't emit a value, you will need to bind the next step using the andThen()
operator or something similar.
Your code compiles because the flatMapCompletable()
operator has a signature of Observable<Long>
. You will need to put the andThen()
operator inside the flatMapCompletable()
function.
Upvotes: 1