Reputation: 4936
I have a question about RxJava Observable. For example, I have an Retrofit interface, which returns me Observable. I need to do something with this stream of Video. Here is code for downloading Videos and saving it's into list:
API.getVideoListObservable()
.doOnError(t -> t.printStackTrace())
.map(r -> r.getObjects())
.doOnNext(l -> VideoActivity.this.runOnUiThread(() -> fragment.updateVideoList(l)))
.doOnNext(l -> kalturaVideoList.addAll(l))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe();
API - retrofit rest adapter
If want to update videolist, should I do same operations? Or I should get subscription, unsubscribe from it and subscribe again like this:
Subscription s = API.getVideoListObservable()
.doOnError(t -> t.printStackTrace())
.map(r -> r.getObjects())
.doOnNext(l -> VideoActivity.this.runOnUiThread(() -> fragment.updateVideoList(l)))
.doOnNext(l -> kalturaVideoList.addAll(l))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe();
//some code
...
s.unsubscribe();
s = null;
s = API.getVideoListObservable()
.doOnError(t -> t.printStackTrace())
.map(r -> r.getObjects())
.doOnNext(l -> VideoActivity.this.runOnUiThread(() -> fragment.updateVideoList(l)))
.doOnNext(l -> kalturaVideoList.addAll(l))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe();
Upvotes: 4
Views: 7222
Reputation: 1229
Your whole flow of events is wrong, you should be updating the UI in subscriber, not in doOnNext.
API.getVideoListObservable()
.map(r -> r.getObjects())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(videos -> {
// do stuff with videos
fragment.updateVideoList(videos);
kalturaVideoList.addAll(videos);
}, throwable -> {
// handle errors
throwable.printStackTrace();
});
Retrofit sets subscribeOn(Schedulers.io())
for you.
Move everything into a function and call it whenever you need to update the list. There is no need to manually unsubscribe because subscribers automatically unsubscribe when they receive onCompleted or onError.
Beware that when using RxJava on Android, you have to deal with Activity/Fragment lifecycle and configuration changes. You have to keep track of pending network calls and manually unsubscribe when user navigates to another Activity/Fragment.
My preferred way of solving all RxJava + Android problems I've encountered so far is using https://github.com/evant/rxloader library. Your code using RxLoader would look like this:
private Observable<List<Video>> getVideos() {
return API.getVideoListObservable()
.map(r -> r.getObjects());
}
// in onCreate()
RxLoaderManager loaderManager = RxLoaderManager.get(this);
RxLoader<List<Video>> videoLoader = loaderManager.create(
getVideos().observeOn(AndroidSchedulers.mainThread()),
new RxLoaderObserver<List<Video>>() {
@Override
public void onNext(List<Video> videos) {
// do stuff with videos
fragment.updateVideoList(videos);
kalturaVideoList.addAll(videos);
}
@Override
public void onError(Throwable throwable) {
// handle errors
throwable.printStackTrace();
}
});
// whenever you need to update videos
videoLoader.restart();
Upvotes: 5