tyronen
tyronen

Reputation: 2568

RxJava: puzzling behavior

I want methods in my class to run some code on an IO thread, but only once a Subject they subscribe to has a certain value. Then the caller should get a response on the Android UI thread.

Something like this:

public class MyClass {

  private final Subject<Boolean, Boolean> subject;
  private final OtherClass otherObject;

  public MyClass(Subject<Boolean, Boolean> subject,
      OtherClass otherObject) {
    this.subject = subject;
    this.otherObject = otherObject;
  }

  public Observable<String> myMethod() {
    return waitForTrue(() -> otherObject.readFromDisk());
  }

  private <T> Observable<T> waitForTrue(Callable<T> callable) {
    return subject
        .first(value -> value)
        .flatMap(value -> Observable.fromCallable(callable))
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread());
  }
}

Does this work? Not sure, so I wrote up a set of unit tests to check them. I found that my test methods, though they always worked when run one by one, would fail as part of a suite.

In fact, I found if I put the very same test twice, it would pass the first time, but fail the second!

public class MyClassTest {

  private TestScheduler ioScheduler;
  private TestScheduler androidScheduler;
  private TestSubscriber<String> testSubscriber;
  private MyClass objectUnderTest;

  @Before public void setup() {
    ioScheduler = new TestScheduler();
    androidScheduler = new TestScheduler();
    testSubscriber = new TestSubscriber<>();
    RxJavaHooks.reset();
    RxJavaHooks.setOnIOScheduler(scheduler -> ioScheduler);
    RxAndroidPlugins.getInstance().reset();
    RxAndroidPlugins.getInstance().registerSchedulersHook(
        new RxAndroidSchedulersHook() {
          @Override public Scheduler getMainThreadScheduler() {
            return androidScheduler;
          };
        });
    Subject<Boolean, Boolean> subject = BehaviorSubject.create(true);
    MyClass.OtherClass otherClass = mock(MyClass.OtherClass.class);
    when(otherClass.readFromDisk()).thenReturn("mike");;
    objectUnderTest = new MyClass(subject, otherClass);
  };

  @Test public void firstTest() {
    objectUnderTest.myMethod().subscribe(testSubscriber);
    ioScheduler.advanceTimeBy(1, TimeUnit.SECONDS);
    androidScheduler.advanceTimeBy(1, TimeUnit.SECONDS);
    testSubscriber.assertValueCount(1);
    // This passes
  };

  @Test public void secondTest() {
    firstTest();
    // This fails!
  };
}

Why is this happening? And is the bug in the class under test, or the test code?

I thought it might be an issue with using RxJava 1.x, but I had a similar issue with RxJava 2.x.

EDIT: The tests failed because of a missing line in the test code. You have to put this in the setup method:

AndroidSchedulers.reset()

because the hook is only ever called once, by the static initializer of the AndroidSchedulers class.

Upvotes: 0

Views: 62

Answers (1)

akarnokd
akarnokd

Reputation: 69997

subscribeOn has no practical effect on a Subject because they don't have a subscription side-effect to move off to another thread. Therefore, when they get a new item, they notify their consumers on the caller thread. Moving an item to another thread should be done via observeOn:

private <T> Observable<T> waitForTrue(Callable<T> callable) {
    return subject
    .filter(value -> value)
    .take(1)
    .observeOn(Schedulers.io())
    .map(value -> callable.call())
    .observeOn(AndroidSchedulers.mainThread());
}

Also you don't need flatMap just to execute the callable, map is enough.

Upvotes: 1

Related Questions