Reputation: 971
The Problem
I have an activity which fetches data from an API periodically and displays the data received. The API uses OAuth so I receive a temporary access token which expires after a certain period of time (1 hr). If the app tries to get data with an expired token, obviously the request will fail. In an earlier iteration of my app, I was using AsyncTasks for the network requests and essentially just executed a new AsyncTask that would get a new access token before calling the main AsyncTask that fetches the data from the server. This worked great because the main AsyncTask would wait until the other one was finished before executing.
I recently switched to RxJava and basically just replaced the AsyncTasks with Observables. The problem is that the main Observable that fetches the data doesn't wait for the Observable that refreshes the access token to finish. Here's my code, thanks for your help.
Code
LiveThreadActivity.java
private Subscription subscription;
private Observable<List<CustomComment>> fetchData;
@Override
protected void onResume() {
super.onResume();
if (tokenExpired()) {
auth.refreshToken();
}
subscription = fetchData
.compose(bindToLifecycle())
.retryWhen(new RetryWithDelay(5, 2000))
.subscribe(list -> addNewComments(list), e -> handleFetchDataError(e));
}
// This method gets called in onCreate()
private void dataCollection() {
fetchData = Observable.interval(0, REFRESH_RATE, TimeUnit.MILLISECONDS)
.map(tick -> fetchNewComments()) // Run function every time a tick is emitted
.retryWhen( new RetryWithDelay(2, 2000) ) // Retry twice with 2 second delay
.subscribeOn(Schedulers.io()) // Network stuff in background thread
.observeOn(AndroidSchedulers.mainThread()); // Other stuff on the main thread
}
Auth.java
public class Auth {
...
public void refreshToken() {
Observable.just(1)
.map(y -> refreshAccessToken())
.retryWhen( new RetryWithDelay(3, 2000) )
.subscribeOn(Schedulers.io())
.subscribe();
}
}
Upvotes: 1
Views: 4312
Reputation: 4077
.flatMap() will probably be sufficient, i.e. tokenObservable.flatMap(/* return dataObservable */)
Upvotes: 1
Reputation: 20258
Using reactive libraries a new way of thinking is needed. You have to write the code as it is synchronious, but be aware that it evecutes asynchroniously.
Your code just executes synchoniously. It executes two Observable
's at the same time.
The function refreshToken()
should look like:
public Observable<?> refreshToken() {
return Observable.just(1)
.map(y -> refreshAccessToken())
.retryWhen( new RetryWithDelay(3, 2000) )
.subscribeOn(Schedulers.io());
}
And onResume()
:
@Override
protected void onResume() {
super.onResume();
Observable obs = fetchData
.compose(bindToLifecycle())
.retryWhen(new RetryWithDelay(5, 2000));
if (tokenExpired()) {
obs = obs.startWith(auth.refreshToken());
}
subscription = obs
.subscribe(list -> addNewComments(list), e -> handleFetchDataError(e));
}
Notice startWith()
operator. It allows to executes one Observable
(fetching list) after another (refreshing token).
Upvotes: 3