Reputation: 189
I am having an issue while using the RxJava concat operator. I have two observables, the first emits results from a server database and the other one emits results from the local database, and then I concat the :
// Uses a Realm in the UI thread
Observable<MyResult> remoteObservable = mRemoteDataSource.find(tId);
// Uses Retrofit
Observable<MyResult> localObservable = mLocalDataSource.find(tId);
Observable.concat(localObservable, remoteObservable)
.doOnNext(result -> /* Do my stuff */)
.observeOn(AndroidSchedulers.mainThread())
.doOnError(throwable -> throwable.printStackTrace())
.subscribe()
So this causes me problem, since I am not using subscribeOn()
the concatenated observable is running on AndroidScheduler.MainThread()
and this does not run the remote and it launches a NetworkOnMainThreadException
.
If I implement a subscribeOn(Schedulers.computation())
I get Realm access from incorrect thread. Realm objects can only be accessed on the thread they were created
since of course the Observable is not running on the thread the Realm instance does exist.
I have searched in other questions and I have not gotten anything useful, I have checked the example made by realm: https://github.com/realm/realm-java/blob/master/examples/rxJavaExample/src/main/java/io/realm/examples/rxjava/retrofit/RetrofitExample.java but strangely I see that the retrofit observable is subscribed on nothing and it works.
Why does it work on the sample and in my code I cannot do the same? Any suggestion?
Upvotes: 2
Views: 1027
Reputation: 12857
Instead of using subscribeOn
at mRealmDataSource.find(tId).subscribeOn(AndroidSchedulers.mainThread())
like said : https://stackoverflow.com/a/39304891/2425851
You can use Observable.defer
for example:
class RealmDataSource{
fun find(id: String): Observable<MyResult> {
// Default pattern for loading data on a background thread
return Observable.defer{
val realm = Realm.getInstance()
val query = realm
.where(MyResult::class.java)
val flowable =
if (realm.isAutoRefresh) {
query
.findAllAsync()
.asFlowable()
.filter(RealmResults::isLoaded)
} else {
Flowable.just(query.findAll())
}
return@defer flowable
.toObservable()
}
}
Then usage will be without subscribeOn
// Uses a Realm
Observable<MyResult> realmObservable = mRealmDataSource.find(tId);
// Uses Retrofit
Observable<MyResult> remoteObservable = mRemoteDataSource.find(tId);
For more info see https://realm.io/blog/realm-java-0-87-0/
Upvotes: 0
Reputation: 11921
You can concat your local and remote observables like below:
// Uses a Realm in the UI thread
Observable<MyResult> remoteObservable = mRemoteDataSource.find(tId);
// Uses Retrofit
Observable<MyResult> localObservable = mLocalDataSource.find(tId);
Observable.concat(localObservable, remoteObservable).first()
.map(new Func1<MyResult, MyResult>() {
@Override
public myResult call(MyResult result) {
if (result == null) {
throw new IllegalArgumentException();
}
return result;
}
});
And subscribe like below:
CompositeSubscription mCompositeSubscription = new CompositeSubscription();
final Subscription subscription = mRepo.find(tId
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<MyResult>() {
@Override
public void onCompleted() {
// Completed
}
@Override
public void onError(Throwable e) {
// onError
}
@Override
public void onNext(MyResult result) {
//onSuccess
}
});
mCompositeSubscription.add(subscription);
You can check this repo for RxJava + Retrofit + Realm https://github.com/savepopulation/wikilight
Good luck!
Upvotes: 2
Reputation: 81539
I believe you should use subscribeOn()
in the right places.
// Uses a Realm in the UI thread
Observable<MyResult> realmObservable = mRealmDataSource.find(tId).subscribeOn(AndroidSchedulers.mainThread());
// Uses Retrofit
Observable<MyResult> retrofitObservable = mRetrofitDataSource.find(tId).subscribeOn(Subscribers.io());
Observable.concat(realmObservable, retrofitObservable)
.doOnNext(result -> /* Do my stuff */)
.subscribeOn(AndroidSchedulers.mainThread())
.observeOn(AndroidSchedulers.mainThread())
.doOnError(throwable -> throwable.printStackTrace())
.subscribe()
See if it fixes your issue.
Upvotes: 3