Reputation: 170
I have a local repository and a remote repository from where I fetch data. For the first time I want to fetch data from the remote repository, and afterwards cache it and fetch it from the local database. I'm using 2 Observables and the concat method to do this:
Observable.concat(localWeatherForecast, remoteWeatherForecast)
.filter(weatherForecasts -> !weatherForecasts.isEmpty())
.first();
For local observable I use this:
private Observable<List<WeatherForecast>> getAndCacheLocalWeather() {
return mLocalDataSource.getWeather()
.flatMap(new Func1<List<WeatherForecast>, Observable<List<WeatherForecast>>>() {
@Override
public Observable<List<WeatherForecast>> call(List<WeatherForecast> weatherEntries) {
return Observable.from(weatherEntries)
.doOnNext(entry -> mCachedWeather.add(entry))
.toList();
}
});
}
and for remote:
return mRemoteDataSource.getWeather()
.flatMap(new Func1<List<WeatherForecast>, Observable<List<WeatherForecast>>>() {
@Override
public Observable<List<WeatherForecast>> call(List<WeatherForecast> weather) {
return Observable.from(weather).doOnNext(entry -> {
mLocalDataSource.saveWeatherEntry(entry);
mCachedWeather.add(entry);
}).toList();
}
})
.doOnCompleted(() -> mCacheIsDirty = false);
and here is the subscription
Subscription subscription = mRepository
.getWeather()
.subscribeOn(mSchedulerProvider.computation())
.observeOn(mSchedulerProvider.ui())
.subscribe(
// onNext
this::processWeather,
// onError
throwable -> mWeatherView.showLoadingTasksError(),
// onCompleted
() -> mWeatherView.setLoadingIndicator(false));
It seems that the concat doesn't move to second observable (the remote one) even if the local repository is empty. If I reverse the order then it works (remote to be first in concat). I tried to remove the filter method / first(), but even so remote observable is not processed.
Any ideas why? Thanks!
Upvotes: 4
Views: 5098
Reputation: 62189
concat()
will wait until first Observable
emits everything and later it will listen for emission of second Observable
. If first Observable
fails to emit anything, i.e. it is empty, then second Observable
won't be regarded.
You can start your first stream with a junk item, and then skip the first emission of the stream:
Observable.concat(
Observable.<Integer>empty().startWith(-1),
Observable.fromArray(1, 2, 3))
.skip(1)
.test()
.assertResult(1, 2, 3);
Now, the first Observable
will emit -1
and nothing after that, which is ok for concat()
operator to function properly. Later you are skipping the first emission which you are not interested in.
Upvotes: 1
Reputation: 4002
you could provide a timeout for the first observable. If the first observable never finishes, it will not switch over to the second one. Please have a look at the test-methods I provide:
The test-method 'never_false' will not produce any values and times out after 1000ms, because no values has been pushed.
For the method 'never_true' the first observable in concat will timeout after some time and will switch over to onComplete observable. Therefore the concat will switch over to the second observable and take the first element from this stream.
@Test
public void never_false() throws Exception {
Observable<List<Integer>> never = Observable.never();
Observable<List<Integer>> just = Observable.just(Arrays.asList(1, 2, 3));
Observable<List<Integer>> concat = Observable.concat(never, just);
boolean await = concat.firstElement().test().await(1000, TimeUnit.MILLISECONDS);
assertThat(await).isTrue();
}
@Test
public void never_true() throws Exception {
Observable<List<Integer>> never = Observable.<List<Integer>>never()
.timeout(50, TimeUnit.MILLISECONDS)
.onErrorResumeNext(Observable.empty());
Observable<List<Integer>> just = Observable.just(Arrays.asList(1, 2, 3));
TestObserver<List<Integer>> test = Observable.concat(never, just)
.test()
.await()
.assertComplete()
.assertNoErrors();
List<Integer> collect = test.values().stream()
.flatMap(Collection::stream)
.collect(Collectors.toList());
assertThat(collect).contains(1, 2, 3);
}
Upvotes: 3