Reputation: 2568
I want methods in my class to run some code on an IO thread, but only once a Subject they subscribe to has a certain value. Then the caller should get a response on the Android UI thread.
Something like this:
public class MyClass {
private final Subject<Boolean, Boolean> subject;
private final OtherClass otherObject;
public MyClass(Subject<Boolean, Boolean> subject,
OtherClass otherObject) {
this.subject = subject;
this.otherObject = otherObject;
}
public Observable<String> myMethod() {
return waitForTrue(() -> otherObject.readFromDisk());
}
private <T> Observable<T> waitForTrue(Callable<T> callable) {
return subject
.first(value -> value)
.flatMap(value -> Observable.fromCallable(callable))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
}
Does this work? Not sure, so I wrote up a set of unit tests to check them. I found that my test methods, though they always worked when run one by one, would fail as part of a suite.
In fact, I found if I put the very same test twice, it would pass the first time, but fail the second!
public class MyClassTest {
private TestScheduler ioScheduler;
private TestScheduler androidScheduler;
private TestSubscriber<String> testSubscriber;
private MyClass objectUnderTest;
@Before public void setup() {
ioScheduler = new TestScheduler();
androidScheduler = new TestScheduler();
testSubscriber = new TestSubscriber<>();
RxJavaHooks.reset();
RxJavaHooks.setOnIOScheduler(scheduler -> ioScheduler);
RxAndroidPlugins.getInstance().reset();
RxAndroidPlugins.getInstance().registerSchedulersHook(
new RxAndroidSchedulersHook() {
@Override public Scheduler getMainThreadScheduler() {
return androidScheduler;
};
});
Subject<Boolean, Boolean> subject = BehaviorSubject.create(true);
MyClass.OtherClass otherClass = mock(MyClass.OtherClass.class);
when(otherClass.readFromDisk()).thenReturn("mike");;
objectUnderTest = new MyClass(subject, otherClass);
};
@Test public void firstTest() {
objectUnderTest.myMethod().subscribe(testSubscriber);
ioScheduler.advanceTimeBy(1, TimeUnit.SECONDS);
androidScheduler.advanceTimeBy(1, TimeUnit.SECONDS);
testSubscriber.assertValueCount(1);
// This passes
};
@Test public void secondTest() {
firstTest();
// This fails!
};
}
Why is this happening? And is the bug in the class under test, or the test code?
I thought it might be an issue with using RxJava 1.x, but I had a similar issue with RxJava 2.x.
EDIT: The tests failed because of a missing line in the test code. You have to put this in the setup method:
AndroidSchedulers.reset()
because the hook is only ever called once, by the static initializer of the AndroidSchedulers
class.
Upvotes: 0
Views: 62
Reputation: 69997
subscribeOn
has no practical effect on a Subject
because they don't have a subscription side-effect to move off to another thread. Therefore, when they get a new item, they notify their consumers on the caller thread. Moving an item to another thread should be done via observeOn
:
private <T> Observable<T> waitForTrue(Callable<T> callable) {
return subject
.filter(value -> value)
.take(1)
.observeOn(Schedulers.io())
.map(value -> callable.call())
.observeOn(AndroidSchedulers.mainThread());
}
Also you don't need flatMap
just to execute the callable
, map is enough.
Upvotes: 1