Aleksandr
Aleksandr

Reputation: 4936

Subscribe on RxJava observable multiple times

I have a question about RxJava Observable. For example, I have an Retrofit interface, which returns me Observable. I need to do something with this stream of Video. Here is code for downloading Videos and saving it's into list:

API.getVideoListObservable()
                .doOnError(t -> t.printStackTrace())
                .map(r -> r.getObjects())
                .doOnNext(l -> VideoActivity.this.runOnUiThread(() -> fragment.updateVideoList(l)))
                .doOnNext(l -> kalturaVideoList.addAll(l))
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe();

API - retrofit rest adapter

If want to update videolist, should I do same operations? Or I should get subscription, unsubscribe from it and subscribe again like this:

Subscription s = API.getVideoListObservable()
                .doOnError(t -> t.printStackTrace())
                .map(r -> r.getObjects())
                .doOnNext(l -> VideoActivity.this.runOnUiThread(() -> fragment.updateVideoList(l)))
                .doOnNext(l -> kalturaVideoList.addAll(l))
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe();

//some code
...

s.unsubscribe();
s = null;
s = API.getVideoListObservable()
                .doOnError(t -> t.printStackTrace())
                .map(r -> r.getObjects())
                .doOnNext(l -> VideoActivity.this.runOnUiThread(() -> fragment.updateVideoList(l)))
                .doOnNext(l -> kalturaVideoList.addAll(l))
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe();

Upvotes: 4

Views: 7222

Answers (1)

Pavel Synek
Pavel Synek

Reputation: 1229

Your whole flow of events is wrong, you should be updating the UI in subscriber, not in doOnNext.

API.getVideoListObservable()
    .map(r -> r.getObjects())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(videos -> {
        // do stuff with videos
        fragment.updateVideoList(videos);
        kalturaVideoList.addAll(videos);
    }, throwable -> {
        // handle errors
        throwable.printStackTrace();
    });

Retrofit sets subscribeOn(Schedulers.io()) for you.

Move everything into a function and call it whenever you need to update the list. There is no need to manually unsubscribe because subscribers automatically unsubscribe when they receive onCompleted or onError.

Beware that when using RxJava on Android, you have to deal with Activity/Fragment lifecycle and configuration changes. You have to keep track of pending network calls and manually unsubscribe when user navigates to another Activity/Fragment.

My preferred way of solving all RxJava + Android problems I've encountered so far is using https://github.com/evant/rxloader library. Your code using RxLoader would look like this:

private Observable<List<Video>> getVideos() {
    return API.getVideoListObservable()
        .map(r -> r.getObjects());
}

// in onCreate()
RxLoaderManager loaderManager = RxLoaderManager.get(this);
RxLoader<List<Video>> videoLoader = loaderManager.create(
    getVideos().observeOn(AndroidSchedulers.mainThread()),
    new RxLoaderObserver<List<Video>>() {
        @Override
        public void onNext(List<Video> videos) {
            // do stuff with videos
            fragment.updateVideoList(videos);
            kalturaVideoList.addAll(videos);
        }

        @Override
        public void onError(Throwable throwable) {
            // handle errors
            throwable.printStackTrace();
        }
    });

// whenever you need to update videos
videoLoader.restart();

Upvotes: 5

Related Questions