Reputation: 2826
So I'm working on an older project which was developed by some other guys, and as it happens RxAndroid is used in the project.
I had to add caching handling in some of the app's requests and the OkHttp cache wasn't doing it well as I needed it. So i came up with the solution to implement my own service which checks for internet connection and then makes the request, or reads it from cache depending on the connection. I don't have experience with RxAndroid but I read a bit and I came up with this:
@Override
public Observable<List<SocialNews>> getStream(@Path("stream") String stream, @Query("networks") String networks, @Query("page") Integer page, @Query("limit") Integer limit) {
return Observable.create(new AsyncOnSubscribe<Object, List<SocialNews>>() {
@Override
protected Object generateState() {
return null;
}
@Override
protected Object next(Object state, long requested, Observer<Observable<? extends List<SocialNews>>> observer) {
if (NetworkUtils.hasActiveInternetConnection(context)) {
observer.onNext(APIClient.getService(context).getStream(stream, networks, page, limit));
} else {
observer.onNext(Observable.just(NetworkUtils.getCachedStream()));
}
return state;
}
});
}
And I subscribe on it like this:
final Observable<List<SocialNews>> observable = APIClient.getCacheService(getActivity()).getStream(stream, networks, page, SportFiveAPI.DEFAULT_ITEM_LIMIT);
LifecycleObservable.bindFragmentLifecycle(lifecycle(),
AppObservable.bindFragment(this, observable)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread()))
.subscribeOn(Schedulers.io())
.subscribe(
socialNews -> {
// do stuff
},
error -> {
// do stuff
}
);
But somehow it gets stuck in a loop and keeps calling the next method inside my AsyncOnSubscribe. Anyone knows what I'm doing wrong here?
Upvotes: 1
Views: 592
Reputation: 3990
It is much better to use Observable.fromCallable
that Observable.create
, exactly to avoid the kind of violation of the Observable
contract that you have in your code - namely not calling onCompleted
. I recommend something like this:
@Override
public Observable<List<SocialNews>> getStream(@Path("stream") String stream, @Query("networks") String networks, @Query("page") Integer page, @Query("limit") Integer limit) {
Observable.fromCallable(() -> NetworkUtils.hasActiveInternetConnection(context))
.flatMap(hasConnection -> {
if (hasConnection) {
return APIClient.getService(context).getStream(stream, networks, page, limit);
} else {
return Observable.just(NetworkUtils.getCachedStream())
}
})
}
Also, there is a really nice pattern for doing this kind of fallback-type data retrieval using concat
and first
. See this blog post from Dan Lew for details.
Upvotes: 2