Reputation: 15919
I want to write a kind of "blackbox test" for a component, that internally uses RxJava.
Internally it uses Retrofit
which returns an Observable
to make a httpcall and afterwards it uses .flatmap()
to future processing on the data retrieved from retrofit. The idea is to give that component a Transformer
for setting the schedulers on the observer like this:
class DefaultTransformer <T> implements Transformer<T, T> {
public Observable<T> call(Observable<T> observable) {
return observable.subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread());
}
}
My Component does something like this:
void execute(Transformer<T, T> scheduler){
Observable<List<Team>> observable = retrofitApi.getLeague(leagueId, seasonId)
.flatMap(new Func1<LeagueWrapper, Observable<List<Team>>>() {
@Override public Observable<List<Team>> call(LeagueWrapper wrapper) {
return Observable.just(wrapper.getLeague().getTeams());
}
});
observable.compose(transformer);
observable.subscribe(this);
}
In production I pass DefaultTransformer
as parameter, but for Unit tests I want to submit a Transformer
that runs on the same thread as the unit test, so everything should run in sync (not async).
I tried that:
class UnitTestTransformer <T> implements Transformer<T, T> {
public Observable<T> call(Observable<T> observable) {
return observable.subscribeOn(Schedulers.test()).observeOn(AndroidSchedulers.test());
}
}
But it still runs async in my unit tests. I also tried Scheduler.immediate()
. toBlocking()
seems not to be an option, because it's not an Observable
anymore. Any idea what could be wrong?
Upvotes: 5
Views: 3370
Reputation: 634
I had a similar problem and solved it by using the class TestObserver (http://reactivex.io/RxJava/javadoc/rx/observers/TestObserver.html)
It has the following three methods giving access to the received events:
getOnCompletedEvents()
getOnErrorEvents()
getOnNextEvents()
I hope this can help you, too, if you can inject your subscriber somehow. Here is an example how I tested it:
TestObserver<MyModel> testObserver = new TestObserver<>();
myObservableSupplyingMethod().subscribe(testObserver);
assertThat(testObserver.getOnErrorEvents().size()).isEqualTo(0);
assertThat(testObserver.getOnNextEvents().get(0)).isNotNull();
...
Upvotes: 0
Reputation: 5940
If changing the pattern of how execute()
is called is not an option, you may want to try using the RxJava Plugin mechanism.
https://github.com/ReactiveX/RxJava/wiki/Plugins
You can provide:
RxJavaSchedulersHook
to override the schedulers that are provided during test execution and get them to execute synchronouslyRxJavaObservableExecutionHook
to hook into the Observable execution pipeline and use some kind of synchronization method (like a CountdownLatch) to wait for the Observable subscriptions to finish before continuingUpvotes: 3