Marian
Marian

Reputation: 170

RxJava concat when first Observable is empty

I have a local repository and a remote repository from where I fetch data. For the first time I want to fetch data from the remote repository, and afterwards cache it and fetch it from the local database. I'm using 2 Observables and the concat method to do this:

Observable.concat(localWeatherForecast, remoteWeatherForecast)
                .filter(weatherForecasts -> !weatherForecasts.isEmpty())
                .first();

For local observable I use this:

private Observable<List<WeatherForecast>> getAndCacheLocalWeather() {
    return mLocalDataSource.getWeather()
            .flatMap(new Func1<List<WeatherForecast>, Observable<List<WeatherForecast>>>() {
                @Override
                public Observable<List<WeatherForecast>> call(List<WeatherForecast> weatherEntries) {
                    return Observable.from(weatherEntries)
                            .doOnNext(entry -> mCachedWeather.add(entry))
                            .toList();
                }
            });
}

and for remote:

return mRemoteDataSource.getWeather()
            .flatMap(new Func1<List<WeatherForecast>, Observable<List<WeatherForecast>>>() {
                @Override
                public Observable<List<WeatherForecast>> call(List<WeatherForecast> weather) {
                    return Observable.from(weather).doOnNext(entry -> {
                        mLocalDataSource.saveWeatherEntry(entry);
                        mCachedWeather.add(entry);
                    }).toList();
                }
            })
            .doOnCompleted(() -> mCacheIsDirty = false);

and here is the subscription

Subscription subscription = mRepository
            .getWeather()
            .subscribeOn(mSchedulerProvider.computation())
            .observeOn(mSchedulerProvider.ui())
            .subscribe(
                    // onNext
                    this::processWeather,
                    // onError
                    throwable -> mWeatherView.showLoadingTasksError(),
                    // onCompleted
                    () -> mWeatherView.setLoadingIndicator(false));

It seems that the concat doesn't move to second observable (the remote one) even if the local repository is empty. If I reverse the order then it works (remote to be first in concat). I tried to remove the filter method / first(), but even so remote observable is not processed.

Any ideas why? Thanks!

Upvotes: 4

Views: 5098

Answers (2)

azizbekian
azizbekian

Reputation: 62189

concat() will wait until first Observable emits everything and later it will listen for emission of second Observable. If first Observable fails to emit anything, i.e. it is empty, then second Observable won't be regarded.

You can start your first stream with a junk item, and then skip the first emission of the stream:

Observable.concat(
            Observable.<Integer>empty().startWith(-1),
            Observable.fromArray(1, 2, 3))
        .skip(1)
        .test()
        .assertResult(1, 2, 3);

Now, the first Observable will emit -1 and nothing after that, which is ok for concat() operator to function properly. Later you are skipping the first emission which you are not interested in.

Upvotes: 1

Sergej Isbrecht
Sergej Isbrecht

Reputation: 4002

you could provide a timeout for the first observable. If the first observable never finishes, it will not switch over to the second one. Please have a look at the test-methods I provide:

The test-method 'never_false' will not produce any values and times out after 1000ms, because no values has been pushed.

For the method 'never_true' the first observable in concat will timeout after some time and will switch over to onComplete observable. Therefore the concat will switch over to the second observable and take the first element from this stream.

@Test
public void never_false() throws Exception {
    Observable<List<Integer>> never = Observable.never();

    Observable<List<Integer>> just = Observable.just(Arrays.asList(1, 2, 3));
    Observable<List<Integer>> concat = Observable.concat(never, just);

    boolean await = concat.firstElement().test().await(1000, TimeUnit.MILLISECONDS);

    assertThat(await).isTrue();
}

@Test
public void never_true() throws Exception {
    Observable<List<Integer>> never = Observable.<List<Integer>>never()
            .timeout(50, TimeUnit.MILLISECONDS)
            .onErrorResumeNext(Observable.empty());

    Observable<List<Integer>> just = Observable.just(Arrays.asList(1, 2, 3));

    TestObserver<List<Integer>> test = Observable.concat(never, just)
            .test()
            .await()
            .assertComplete()
            .assertNoErrors();

    List<Integer> collect = test.values().stream()
            .flatMap(Collection::stream)
            .collect(Collectors.toList());

    assertThat(collect).contains(1, 2, 3);
}

Upvotes: 3

Related Questions