sockeqwe
sockeqwe

Reputation: 15919

RxJava subscribe and observe on same thread as unit test

I want to write a kind of "blackbox test" for a component, that internally uses RxJava.

Internally it uses Retrofit which returns an Observable to make a httpcall and afterwards it uses .flatmap() to future processing on the data retrieved from retrofit. The idea is to give that component a Transformer for setting the schedulers on the observer like this:

class DefaultTransformer <T> implements Transformer<T, T> {

   public Observable<T> call(Observable<T> observable) { 
      return observable.subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread());
   }
}

My Component does something like this:

void execute(Transformer<T, T> scheduler){
     Observable<List<Team>> observable = retrofitApi.getLeague(leagueId, seasonId)
        .flatMap(new Func1<LeagueWrapper, Observable<List<Team>>>() {
          @Override public Observable<List<Team>> call(LeagueWrapper wrapper) {                
             return Observable.just(wrapper.getLeague().getTeams());
          }
        });

   observable.compose(transformer);

   observable.subscribe(this);
}

In production I pass DefaultTransformer as parameter, but for Unit tests I want to submit a Transformer that runs on the same thread as the unit test, so everything should run in sync (not async).

I tried that:

class UnitTestTransformer <T> implements Transformer<T, T> {

       public Observable<T> call(Observable<T> observable) { 
          return observable.subscribeOn(Schedulers.test()).observeOn(AndroidSchedulers.test());
       }
    }

But it still runs async in my unit tests. I also tried Scheduler.immediate(). toBlocking() seems not to be an option, because it's not an Observable anymore. Any idea what could be wrong?

Upvotes: 5

Views: 3370

Answers (2)

Maximosaic
Maximosaic

Reputation: 634

I had a similar problem and solved it by using the class TestObserver (http://reactivex.io/RxJava/javadoc/rx/observers/TestObserver.html)

It has the following three methods giving access to the received events:

getOnCompletedEvents()
getOnErrorEvents()
getOnNextEvents()

I hope this can help you, too, if you can inject your subscriber somehow. Here is an example how I tested it:

TestObserver<MyModel> testObserver = new TestObserver<>();
myObservableSupplyingMethod().subscribe(testObserver);

assertThat(testObserver.getOnErrorEvents().size()).isEqualTo(0);
assertThat(testObserver.getOnNextEvents().get(0)).isNotNull();
...

Upvotes: 0

Ross Hambrick
Ross Hambrick

Reputation: 5940

If changing the pattern of how execute() is called is not an option, you may want to try using the RxJava Plugin mechanism.

https://github.com/ReactiveX/RxJava/wiki/Plugins

You can provide:

  • RxJavaSchedulersHook to override the schedulers that are provided during test execution and get them to execute synchronously
  • RxJavaObservableExecutionHook to hook into the Observable execution pipeline and use some kind of synchronization method (like a CountdownLatch) to wait for the Observable subscriptions to finish before continuing

Upvotes: 3

Related Questions