Gleichmut
Gleichmut

Reputation: 6989

Andorid rxJava: how to get data from cache and and the same time update it in the background?

I just start learning rxJava for Android and want to implement the common use case:

Traditionally on of the best scenarios was use CursorLoader to get data from cache, run web request in the separate thread and save data to the disk via content provider, content provider automatically notify the listener and CursorLoader autoupdate UI.

In rxJava I can do it by running two different Observers as you can see in code below, but I don't find the way how to combine this two calls into the one to reach my aim. Googling shows this thread but it looks like it just get data from the cache or data from the web server, but don't do both RxJava and Cached Data

Code snippet:

@Override
public Observable<SavingsGoals> getCachedSavingsGoal() {
    return observableGoal.getSavingsGoals()
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread());
}

@Override
public Observable<SavingsGoals> getRecentSavingsGoal() {
    return api.getSavingsGoals()
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread());
}

    model.getCachedSavingsGoal().subscribe(new Observer<SavingsGoals>() {
        @Override
        public void onCompleted() {
            // no op
        }

        @Override
        public void onError(Throwable e) {
            Log.e(App.TAG, "Failed to consume cached data");
            view.showError();
        }

        @Override
        public void onNext(SavingsGoals savingsGoals) {
            Log.d(App.TAG, "Show the next item");
            if (savingsGoals != null && !savingsGoals.getSavingsGoals().isEmpty()) {
                view.showData(savingsGoals.getSavingsGoals());
            } else {
                view.showError();
            }
        }
    });

    model.getRecentSavingsGoal().subscribe(new Observer<SavingsGoals>() {
        @Override
        public void onCompleted() {
            // no op
        }

        @Override
        public void onError(Throwable e) {
            Log.e(App.TAG, "Failed to consume data from the web", e);
            view.showError();
        }

        @Override
        public void onNext(SavingsGoals savingsGoals) {
            if (savingsGoals != null && !savingsGoals.getSavingsGoals().isEmpty()) {
                view.showData(savingsGoals.getSavingsGoals());
            } else {
                view.showError();
            }
        }
    });

Also, the one of issues with current approach is cache and web data are not garranted to be run sequently. It is possible when outdated data will come as latest and override recent from web.

To solve this issue I implemented Observer merge with filtration by timestamp: it get data from cache, pass it to the next observer and if cache is outdated fire new call to the web - case for thread competition solved by the filtration with timestamps. However, the issue with this approach I can not return cache data from this Observable - I need to wait when both requests finish their work.

Code snippet.

    @Override
public Observable<Timestamped<SavingsGoals>> getSavingGoals() {
    return observableGoal
            .getTimestampedSavingsGoals()
            .subscribeOn(Schedulers.io())
            .flatMap(new Func1<Timestamped<SavingsGoals>, Observable<Timestamped<SavingsGoals>>>() {
                @Override
                public Observable<Timestamped<SavingsGoals>> call(Timestamped<SavingsGoals> cachedData) {
                    Log.d(App.FLOW, "getTimestampedSavingsGoals");
                    return getGoalsFromBothSources()
                            .filter(filterResponse(cachedData));
                }
            })
            .subscribeOn(AndroidSchedulers.mainThread());
}

private Func1<Timestamped<SavingsGoals>, Boolean> filterResponse(Timestamped<SavingsGoals> cachedData) {
    return new Func1<Timestamped<SavingsGoals>, Boolean>() {
        @Override
        public Boolean call(Timestamped<SavingsGoals> savingsGoals) {
            return savingsGoals != null
                    && cachedData != null
                    && cachedData.getTimestampMillis() < savingsGoals.getTimestampMillis()
                    && savingsGoals.getValue().getSavingsGoals().size() != 0;
        }
    };
}

private Observable<Timestamped<SavingsGoals>> getGoalsFromBothSources() {
    Log.d(App.FLOW, "getGoalsFromBothSources:explicit");
    return Observable.merge(
            observableGoal.getTimestampedSavingsGoals().subscribeOn(Schedulers.io()),
            api.getSavingsGoals()
                    .timestamp()
                    .flatMap(new Func1<Timestamped<SavingsGoals>, Observable<Timestamped<SavingsGoals>>>() {
                        @Override
                        public Observable<Timestamped<SavingsGoals>> call(Timestamped<SavingsGoals> savingsGoals) {
                            Log.d(App.FLOW, "getGoalsFromBothSources:implicit");
                            return observableGoal.saveAllWithTimestamp(savingsGoals.getTimestampMillis(), savingsGoals.getValue().getSavingsGoals());
                        }
                    }))
                    .subscribeOn(Schedulers.io());
}

Do you know the approach to do this in one Observer?

Potential solution:

@Override
public Observable<SavingsGoals> getSavingGoals() {
    return api.getSavingsGoals()
            .publish(network ->
                    Observable.mergeDelayError(
                            observableGoal.getSavingsGoals().takeUntil(network),
                            network.flatMap(new Func1<SavingsGoals, Observable<SavingsGoals>>() {
                                @Override
                                public Observable<SavingsGoals> call(SavingsGoals savingsGoals) {
                                    return observableGoal.saveAll(savingsGoals.getSavingsGoals());
                                }
                            })
                    )
            )
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread());
}

Upvotes: 1

Views: 3374

Answers (2)

Vincent Paing
Vincent Paing

Reputation: 2475

For first question : You can save the result from Network Result by using doOnNext Method, It would looks something like this

public Observable<NetworkResponse> getDataFromNetwork(
      final Request request) {
    return networkCall.doOnNext(networkResponse -> saveToStorage(networkResponse);
  }

Now to combine the two results from both Storage and Online, the best way is to combine with publish and merge. I recommend watching this talk. The code would look something like this

  public Observable<Response> getData(final Request request) {

    return dataService.getDataFromNetwork(request)
        .publish(networkResponse ->  Observable.merge(networkResponse, dataService.getDataFromStorage(request).takeUntil(networkResponse)));
  }

Why use publish and merge you my ask? publish method makes the response accessible in the callback. takeUntil means that you will take the data from storage but you will stop it IF for some reason, network call is finished before accessing storage data is finished. This way, you can be sure that new data from network is always shown even if it's finished before getting old data from storage.

The last but not least, in your subscriber OnNext just add the items to the list. (list.clear and list.addAll) Or similar functions or in you case view.showData()

EDIT: For The call getting disrupted when there's an error from network, add onErrorResumeNext at the end.

  public Observable<Response> getData(final Request request) {

    return dataService.getDataFromNetwork(request)
        .publish(networkResponse ->  Observable.merge(networkResponse, dataService.getDataFromStorage(request).takeUntil(networkResponse)))
        .onErrorResumeNext(dataService.getDataFromStorage(request);
  }

Upvotes: 4

borichellow
borichellow

Reputation: 1001

I'd recommend to "listen" only to local data, and refresh it when API response came. Let say for getting local data you have something like:

@Nonnull
public Observable<SomeData> getSomeDataObservable() {
    return Observable
            .defer(new Func0<Observable<SomeData>>() {
                @Override
                public Observable<SomeData> call() {
                    return Observable.just(getSomeData());
                }
            });
}

So you need to add PublishSubject that will emit every time, when local data was updated (refreshSubject):

@Nonnull
public Observable<SomeData> getSomeDataObservableRefreshable() {
    return refreshSubject.startWith((Object)null).switchMap(new Func1() {
        public Observable<T> call(Object o) {
            return getSomeDataObservable();
        }
    }     
}

Now you need to subscribe only to getSomeDataObservableRefreshable(), and each time when data came from API, you update it and make refreshSubject .onNext(new Object())

Also i'd recommend to take a look to rx-java-extensions lib, it has alot of "cool tools" for RxAndroid. For example solution for your problem would be:

@Nonnull
public Observable<SomeData> getSomeDataObservable() {
    return Observable
            .defer(new Func0<Observable<SomeData>>() {
                @Override
                public Observable<SomeData> call() {
                    return Observable.just(getSomeData());
                }
            })
            .compose(MoreOperators.<SomeData>refresh(refreshSubject));
}

Upvotes: 0

Related Questions