Drilon Blakqori
Drilon Blakqori

Reputation: 2826

RxAndroid observable looping

So I'm working on an older project which was developed by some other guys, and as it happens RxAndroid is used in the project.
I had to add caching handling in some of the app's requests and the OkHttp cache wasn't doing it well as I needed it. So i came up with the solution to implement my own service which checks for internet connection and then makes the request, or reads it from cache depending on the connection. I don't have experience with RxAndroid but I read a bit and I came up with this:

@Override
public Observable<List<SocialNews>> getStream(@Path("stream") String stream, @Query("networks") String networks, @Query("page") Integer page, @Query("limit") Integer limit) {
    return Observable.create(new AsyncOnSubscribe<Object, List<SocialNews>>() {
        @Override
        protected Object generateState() {
            return null;
        }

        @Override
        protected Object next(Object state, long requested, Observer<Observable<? extends List<SocialNews>>> observer) {
            if (NetworkUtils.hasActiveInternetConnection(context)) {
                observer.onNext(APIClient.getService(context).getStream(stream, networks, page, limit));
            } else {
                observer.onNext(Observable.just(NetworkUtils.getCachedStream()));
            }
            return state;
        }
    });
}

And I subscribe on it like this:

 final Observable<List<SocialNews>> observable = APIClient.getCacheService(getActivity()).getStream(stream, networks, page, SportFiveAPI.DEFAULT_ITEM_LIMIT);
    LifecycleObservable.bindFragmentLifecycle(lifecycle(),
            AppObservable.bindFragment(this, observable)
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread()))
            .subscribeOn(Schedulers.io())
            .subscribe(
                    socialNews -> {
                        // do stuff
                    },
                    error -> {
                        // do stuff
                    }
            );

But somehow it gets stuck in a loop and keeps calling the next method inside my AsyncOnSubscribe. Anyone knows what I'm doing wrong here?

Upvotes: 1

Views: 592

Answers (2)

kenny_k
kenny_k

Reputation: 3990

It is much better to use Observable.fromCallable that Observable.create, exactly to avoid the kind of violation of the Observable contract that you have in your code - namely not calling onCompleted. I recommend something like this:

@Override
public Observable<List<SocialNews>> getStream(@Path("stream") String stream, @Query("networks") String networks, @Query("page") Integer page, @Query("limit") Integer limit) {
Observable.fromCallable(() -> NetworkUtils.hasActiveInternetConnection(context))
          .flatMap(hasConnection -> {
            if (hasConnection) {
                return APIClient.getService(context).getStream(stream, networks, page, limit);
            } else {
                return Observable.just(NetworkUtils.getCachedStream())
            }
          })
}

Also, there is a really nice pattern for doing this kind of fallback-type data retrieval using concat and first. See this blog post from Dan Lew for details.

Upvotes: 2

JohnWowUs
JohnWowUs

Reputation: 3083

You aren't calling onCompleted on your observer.

Upvotes: 2

Related Questions