Reputation: 141
I have this simple code below that simulates a scenario Im currently trying to accomplish
mApiService.api().postSomethingWithAccessToken(request, "some_invalid_access_token")
.subscribeOn(Schedulers.io())
.retryWhen(new Function<Observable<Throwable>, ObservableSource<AccessToken>>() {
@Override
public ObservableSource<AccessToken> apply(Observable<Throwable> throwableObservable) throws Exception {
return mApiService.api().getAccessToken();
}
})
.subscribeOn(Schedulers.io())
.subscribe(new Observer<Void>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Void value) {
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
onError(e);
}
@Override
public void onComplete() {
}
});
Ill just enumerate it to make my goal clear:
based on the code above and my understanding so far with .retryWhen(), is that it will execute if an error happened on the original Observable( .postSomethingWithAccessToken()), and retrying if necessary (based on your conditions inside retry), what happens here is that the .retryWhen() executes first before the outer Observable, causing undesired duplicate request, how can I achieve those things I mentioned above, based on my current understanding(code)? Any help will be greatly appreciated. :(
Edit: Current workaround:
mApiService.api().postSomethingWithAccessToken(request, preferences.getString("access_token", ""))
.subscribeOn(Schedulers.io())
.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(final Observable<Throwable> throwableObservable) throws Exception {
return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Throwable throwable) throws Exception {
if (throwable instanceof HttpException) {
HttpException httpException = (HttpException) throwable;
if (httpException.code() == 401) {
return mApiService.api().getAccessToken()
.doOnNext(new Consumer<Authentication>() {
@Override
public void accept(Authentication authentication) throws Exception {
update(authentication);
}
});
}
}
return Observable.error(throwable);
}
});
}
})
.subscribe(new Observer<Void>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("subscribe", "TOKEN : " + preferences.getString("access_token", ""));
}
@Override
public void onNext(Void value) {
Log.e("onNext", "TOKEN : " + preferences.getString("access_token", ""));
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onComplete() {
Log.e("Complete", "____ COMPLETE");
}
});
Method that updates the token via shared preference
public void update(Authentication authentication) {
preferences.edit().putString("access_token", authentication.getAccessToken()).commit();
}
I noticed that(i put a Log) the outer observable's subscribe and the retryWhen was executed at main thread, but the stream of retrying/resubscribing is jumping over different Scheduler's thread, it seems like a race condition :(
onSubscrbie_outer_observable: Thread[main,5,main]
RetryWhen: Thread[main,5,main]
Throwable_FlatMap: Thread[RxCachedThreadScheduler-1,5,main]
doOnNext(Token_Refresh): Thread[RxCachedThreadScheduler-1,5,main]
Throwable_FlatMap: Thread[RxCachedThreadScheduler-2,5,main]
doOnNext(Token_Refresh): Thread[RxCachedThreadScheduler-2,5,main]
Throwable_FlatMap: Thread[RxCachedThreadScheduler-1,5,main]
doOnNext(Token_Refresh): Thread[RxCachedThreadScheduler-1,5,main]
// and so on...
Upvotes: 4
Views: 1602
Reputation: 141
BIG Thanks to yosriz as he pointed me in the right direction to solve my teeth-grinding problem, I have to use defer
. So I ended up with this issue in GitHub, Why resubscribe the source observable emit same output when I use retryWhen operator?
It's exactly the same issue I'm having right now, for anyone experiencing the same issue here here is my solution.
Observable
.defer(new Callable<ObservableSource<?>>() {
@Override
public ObservableSource<?> call() throws Exception {
// return an observable source here, the observable that will be the source of the entire stream;
}
})
.subscribeOn( /*target thread to run*/ )
.retryWhen( {
// return a throwable observable here that will perform the logic when an error occurred
})
.subscribe( /*subscription here*/ )
or here is the full non-lambda of my solution
Observable
.defer(new Callable<ObservableSource<?>>() {
@Override
public ObservableSource<?> call() throws Exception {
return mApiService.api().postSomethingWithAccessToken(
request, preferences.getString("access_token", ""));
}
})
.subscribeOn(Schedulers.io())
.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(final Observable<Throwable> throwableObservable) throws Exception {
return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Throwable throwable) throws Exception {
if (throwable instanceof HttpException) {
HttpException httpException = (HttpException) throwable;
if (httpException.code() == 401) {
return mApiService.api().getAccessToken().doOnNext(new Consumer<Authentication>() {
@Override
public void accept(Authentication authentication) throws Exception {
update(authentication);
}
});
}
}
return Observable.error(throwable);
}
});
}
})
.subscribe(new Observer<Void>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("subscribe", "TOKEN : " + preferences.getString("access_token", ""));
}
@Override
public void onNext(Void value) {
Log.e("onNext", "TOKEN : " + preferences.getString("access_token", ""));
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onComplete() {
Log.e("Complete", "____ COMPLETE");
}
});
The keypoint here is "how to modify/update the existing source observable when .retryWhen()
operator resubscribe to the source observable"
Upvotes: 1
Reputation: 10267
There are few problems here:
postSomethingWithAccessToken
method when retrying, otherwise you'll just retry with the same old invalid access token.Observable
you get and put your retry logic there. as you were saying this method is executed first, not when error happens, the throwableObservable
is what response to error, it will mirror errors as emissions (onNext()
), you can flatMap()
each error and response either with error (for delivering error to the source stream) complete , or with onNext()
with some object to signal it to retry.So you need:
1) to store the Access Token somewhere where you can change it with access token refresh.
2) fix the retry when logic to respond properly to errors
Here's a suggestion code:
postSomethingWithAccessToken(request, accessToken)
.subscribeOn(Schedulers.io())
.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(
@NonNull Observable<Throwable> throwableObservable) throws Exception {
return throwableObservable.flatMap(
new Function<Throwable, ObservableSource<? extends R>>() {
@Override
public ObservableSource<? extends R> apply(
@NonNull Throwable throwable) throws Exception {
if (throwable.code == 401) { //or 404/403, just a pseudo-code, put your real error comparing logic here
return getAccessToken()
.doOnNext(refreshedToken -> accessToken.updateToken(refreshedToken));
//or keep accessToken on some field, the point to have mutable
//var that you can change and postSomethingWithAccessToken can see
}
return Observable.error(throwable);
}
});
}
}
)
.subscribeOn(Schedulers.io())
.subscribe(new Consumer<Result>() {
@Override
public void accept(@NonNull Result result) throws Exception {
//handle result
}
}
);
Upvotes: 3