No Soy Beto
No Soy Beto

Reputation: 189

Realm + Retrofit + RxJava: Concat and SubscribeOn

I am having an issue while using the RxJava concat operator. I have two observables, the first emits results from a server database and the other one emits results from the local database, and then I concat the :

// Uses a Realm in the UI thread
Observable<MyResult> remoteObservable = mRemoteDataSource.find(tId);

// Uses Retrofit
Observable<MyResult> localObservable = mLocalDataSource.find(tId);

Observable.concat(localObservable, remoteObservable)
    .doOnNext(result -> /* Do my stuff */)
    .observeOn(AndroidSchedulers.mainThread())
    .doOnError(throwable -> throwable.printStackTrace())
    .subscribe()

So this causes me problem, since I am not using subscribeOn() the concatenated observable is running on AndroidScheduler.MainThread() and this does not run the remote and it launches a NetworkOnMainThreadException.

If I implement a subscribeOn(Schedulers.computation()) I get Realm access from incorrect thread. Realm objects can only be accessed on the thread they were created since of course the Observable is not running on the thread the Realm instance does exist.

I have searched in other questions and I have not gotten anything useful, I have checked the example made by realm: https://github.com/realm/realm-java/blob/master/examples/rxJavaExample/src/main/java/io/realm/examples/rxjava/retrofit/RetrofitExample.java but strangely I see that the retrofit observable is subscribed on nothing and it works.

Why does it work on the sample and in my code I cannot do the same? Any suggestion?

Upvotes: 2

Views: 1027

Answers (3)

NickUnuchek
NickUnuchek

Reputation: 12857

Instead of using subscribeOn at mRealmDataSource.find(tId).subscribeOn(AndroidSchedulers.mainThread()) like said : https://stackoverflow.com/a/39304891/2425851

You can use Observable.defer for example:

class RealmDataSource{
fun find(id: String): Observable<MyResult> {
// Default pattern for loading data on a background thread
return Observable.defer{
                val realm = Realm.getInstance()

                val query = realm
                    .where(MyResult::class.java)

                val flowable =
                    if (realm.isAutoRefresh) {
                        query
                            .findAllAsync()
                            .asFlowable()
                            .filter(RealmResults::isLoaded)
                    } else {
                        Flowable.just(query.findAll())
                    }

                return@defer flowable
                    .toObservable()
            }
}

Then usage will be without subscribeOn

// Uses a Realm
Observable<MyResult> realmObservable = mRealmDataSource.find(tId);

// Uses Retrofit
Observable<MyResult> remoteObservable = mRemoteDataSource.find(tId);

For more info see https://realm.io/blog/realm-java-0-87-0/

Upvotes: 0

savepopulation
savepopulation

Reputation: 11921

You can concat your local and remote observables like below:

// Uses a Realm in the UI thread
Observable<MyResult> remoteObservable = mRemoteDataSource.find(tId);

// Uses Retrofit
Observable<MyResult> localObservable = mLocalDataSource.find(tId);

Observable.concat(localObservable, remoteObservable).first()
                .map(new Func1<MyResult, MyResult>() {
                    @Override
                    public myResult call(MyResult result) {
                        if (result == null) {
                            throw new IllegalArgumentException();
                        }
                        return result;
                    }
                });

And subscribe like below:

CompositeSubscription mCompositeSubscription = new CompositeSubscription();
final Subscription subscription = mRepo.find(tId
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<MyResult>() {
                    @Override
                    public void onCompleted() {
                        // Completed
                    }

                    @Override
                    public void onError(Throwable e) {
                        // onError
                    }

                    @Override
                    public void onNext(MyResult result) {
                        //onSuccess
                    }
                });
mCompositeSubscription.add(subscription);

You can check this repo for RxJava + Retrofit + Realm https://github.com/savepopulation/wikilight

Good luck!

Upvotes: 2

EpicPandaForce
EpicPandaForce

Reputation: 81539

I believe you should use subscribeOn() in the right places.

// Uses a Realm in the UI thread
Observable<MyResult> realmObservable = mRealmDataSource.find(tId).subscribeOn(AndroidSchedulers.mainThread());

// Uses Retrofit
Observable<MyResult> retrofitObservable = mRetrofitDataSource.find(tId).subscribeOn(Subscribers.io());

Observable.concat(realmObservable, retrofitObservable)
    .doOnNext(result -> /* Do my stuff */)
    .subscribeOn(AndroidSchedulers.mainThread())
    .observeOn(AndroidSchedulers.mainThread())
    .doOnError(throwable -> throwable.printStackTrace())
    .subscribe()

See if it fixes your issue.

Upvotes: 3

Related Questions