Reputation: 91
I have been working with Retrofit for my Android application's REST API calls and it has been working out great. However I have run into issues around losing connectivity and request failing. My application involves programmatically connecting to a WiFi network and then once connected, issue API calls to initialize. However sometimes these connections are slow and I needed a way to be able to resend these requests when they fail. This is where I turned to RxJava, since there is not support for this in Retrofit. What I want this to do is once connected to the new WiFi network issue a ping to the REST API and if there is a network error, wait one second and retry the ping. Here is my code:
Function declaration in interface:
@Headers("Content-Type: application/json")
@GET("something")
io.reactivex.Observable<String> ping();
Retrofit declaration in Activity
Retrofit retrofit = new Retrofit.Builder()
.baseUrl(Data.cloudEndpoint)
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.build();
avfsService = retrofit.create(MyClass.class);
Attempt at executing API call in Activity
service.ping()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
Log.d(TAG, "retryWhen");
return Observable.timer(1, TimeUnit.SECONDS);
}
})
.blockingSubscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscibe " + d.toString());
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext " + s);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
e.printStackTrace();
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
launch();
}
});
So when this is executed here is the output I see in Logcat
onSubscibe null
retryWhen
onComplete
So onNext is never called and I am not too sure why. I thought that onNext is called every time data is emitted from the Obeservable and I thought that by creating an Observable and subscribing to it, that the API call would be executed, and the response would be emitted by the Observable. This appears to not be the case as I am now not even able to recreate the network error and onComplete is called immediately.
I am using a blockingSubscribe here because I have seen others say that can be helpful is executing on a io thread and observing on the main thread, but no luck. I see the same behavior when I just use .subscribe().
I think I am missing something but don't understand if my API call is not being executed or if I am just not getting the response correctly.
Any help is greatly appreciated.
Upvotes: 1
Views: 1678
Reputation: 25623
Your problem comes from the fact that you're using blockingSubscribe
. This blocks the thread until the observable has completed.
Since you're telling it to observe on the android thread and this is presumably the thread which is being blocked, it never gets the opportunity to send these onNext
calls unless onComplete
has been called, which makes it ignore any further onNext
calls.
It is possible for your given code to not deadlock because observeOn
reserves the right to prioritize onComplete
and onError
calls if it is not explicitly told to keep the order. This means it allows the onComplete
calls to go through, unblocking the thread.
Upvotes: 0
Reputation: 589
Try this instead:
service.ping()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
Log.d(TAG, "retryWhen");
return Observable.timer(1, TimeUnit.SECONDS);
}
})
.subscribe(new Subscriber<String>() {
@Override
public void onNext(String s) {
Log.d(TAG, "onNext " + s);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError");
e.printStackTrace();
}
@Override
public void onCompleted() {
Log.d(TAG, "onComplete");
launch();
}
});
Upvotes: 0
Reputation: 32036
Your confusion is probably coming from the fact that the retryWhen
operator only calls your retry logic function once. Since it passes you an Observable
of Throwable
s, it wants you to emit your retry observables when you get a new error
emitted from throwableObservable
. The easiest way to do this is to flatmap
the errors into your retry logic --
.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
Log.d(TAG, "retryWhen");
return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Throwable throwable) throws Exception {
Log.d(TAG, "retrying");
return Observable.timer(1, TimeUnit.SECONDS);
}
});
}
})
If you run the above, you will still see "retryWhen" called once, and then "retrying" every second while there are errors until you get a onNext
and onComplete
. You may want to add code to give up after a certain time, or at least make sure you dispose of the subscription properly so it doesn't run forever.
As a side note, this code (and RxJava in general) is a lot more readable with lambdas.
If you are still trying to get your head around retryWhen
check out this article by Dan Lew has a good explanation.
Upvotes: 2