OhWhen
OhWhen

Reputation: 91

Retrofit + RxJava Android onNext never called but onComplete is

I have been working with Retrofit for my Android application's REST API calls and it has been working out great. However I have run into issues around losing connectivity and request failing. My application involves programmatically connecting to a WiFi network and then once connected, issue API calls to initialize. However sometimes these connections are slow and I needed a way to be able to resend these requests when they fail. This is where I turned to RxJava, since there is not support for this in Retrofit. What I want this to do is once connected to the new WiFi network issue a ping to the REST API and if there is a network error, wait one second and retry the ping. Here is my code:

Function declaration in interface:

@Headers("Content-Type: application/json")
@GET("something")
io.reactivex.Observable<String> ping();

Retrofit declaration in Activity

Retrofit retrofit = new Retrofit.Builder()
            .baseUrl(Data.cloudEndpoint)
            .addConverterFactory(GsonConverterFactory.create())
            .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
            .build();

avfsService = retrofit.create(MyClass.class);

Attempt at executing API call in Activity

service.ping()
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
            @Override
            public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
                Log.d(TAG, "retryWhen");
                return Observable.timer(1, TimeUnit.SECONDS);
            }
        })
        .blockingSubscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscibe " + d.toString());
            }

            @Override
            public void onNext(String s) {
                Log.d(TAG, "onNext " + s);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError");
                e.printStackTrace();
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
                launch();
            }
        });

So when this is executed here is the output I see in Logcat

onSubscibe null
retryWhen
onComplete

So onNext is never called and I am not too sure why. I thought that onNext is called every time data is emitted from the Obeservable and I thought that by creating an Observable and subscribing to it, that the API call would be executed, and the response would be emitted by the Observable. This appears to not be the case as I am now not even able to recreate the network error and onComplete is called immediately.

I am using a blockingSubscribe here because I have seen others say that can be helpful is executing on a io thread and observing on the main thread, but no luck. I see the same behavior when I just use .subscribe().

I think I am missing something but don't understand if my API call is not being executed or if I am just not getting the response correctly.

Any help is greatly appreciated.

Upvotes: 1

Views: 1678

Answers (3)

Kiskae
Kiskae

Reputation: 25623

Your problem comes from the fact that you're using blockingSubscribe. This blocks the thread until the observable has completed.

Since you're telling it to observe on the android thread and this is presumably the thread which is being blocked, it never gets the opportunity to send these onNext calls unless onComplete has been called, which makes it ignore any further onNext calls.

It is possible for your given code to not deadlock because observeOn reserves the right to prioritize onComplete and onError calls if it is not explicitly told to keep the order. This means it allows the onComplete calls to go through, unblocking the thread.

Upvotes: 0

Shrikant
Shrikant

Reputation: 589

Try this instead:

service.ping()
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
            @Override
            public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
                Log.d(TAG, "retryWhen");
                return Observable.timer(1, TimeUnit.SECONDS);
            }
        })
        .subscribe(new Subscriber<String>() {    
            @Override
            public void onNext(String s) {
                Log.d(TAG, "onNext " + s);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError");
                e.printStackTrace();
            }

            @Override
           public void onCompleted() {
                Log.d(TAG, "onComplete");
                launch();
           }
    });

Upvotes: 0

iagreen
iagreen

Reputation: 32036

Your confusion is probably coming from the fact that the retryWhen operator only calls your retry logic function once. Since it passes you an Observable of Throwables, it wants you to emit your retry observables when you get a new error emitted from throwableObservable. The easiest way to do this is to flatmap the errors into your retry logic --

.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
                @Override
                public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
                    Log.d(TAG, "retryWhen");
                    return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
                        @Override
                        public ObservableSource<?> apply(Throwable throwable) throws Exception {
                            Log.d(TAG, "retrying");
                            return Observable.timer(1, TimeUnit.SECONDS);
                        }
                    });
                }
            })

If you run the above, you will still see "retryWhen" called once, and then "retrying" every second while there are errors until you get a onNext and onComplete. You may want to add code to give up after a certain time, or at least make sure you dispose of the subscription properly so it doesn't run forever.

As a side note, this code (and RxJava in general) is a lot more readable with lambdas.

If you are still trying to get your head around retryWhen check out this article by Dan Lew has a good explanation.

Upvotes: 2

Related Questions