savepopulation
savepopulation

Reputation: 11921

Android Realm + RxJava - Realm access from incorrect thread. Realm objects can only be accessed on the thread they were created

I'm trying to implement RxJava + Realm + Retrofit + Repository Pattern

Here's my local implementation:

@Override
public Observable<Page> search(@NonNull final String query) {

        return Realm.getDefaultInstance().where(Page.class)
                .equalTo("query", query)
                .findAll()
                .asObservable()
                .cast(Page.class);
    }

Here's my remote implementation:

 @Override
 public Observable<Page> search(@NonNull String query) {
        return mWikiServices.search(query).map(new Func1<Result, Page>() {
            @Override
            public Page call(Result result) {
                final List<Page> pages = new ArrayList<>(result.getQuery().getPages().values());
                return pages.get(0);
            }
        });
    }

Here's my repo implementation:

 final Observable<Page> localResult = mSearchLocalDataSource.search(query);
 final Observable<Page> remoteResult = mSearchRemoteDataSource.search(query)
                .doOnNext(new Action1<Page>() {
                    @Override
                    public void call(Page page) {
                        //mSearchLocalDataSource.save(query, page);
                        //mResultCache.put(query, page);
                    }
                });

        return Observable.concat(localResult, remoteResult)
                .first()
                .doOnError(new Action1<Throwable>() {
                    @Override
                    public void call(Throwable throwable) {
                        throwable.printStackTrace();
                    }
                });

And finally here's my subscription in presenter.

final Subscription subscription = mSearchRepository.search(this.mQuery)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<Page>() {
                    @Override
                    public void onCompleted() {
                        // Completed
                    }

                    @Override
                    public void onError(Throwable e) {
                        mView.onDefaultMessage(e.getMessage());
                    }

                    @Override
                    public void onNext(Page page) {
                        mView.onDefaultMessage(page.getContent());
                    }
                });

        mCompositeSubscription.add(subscription);

When i run code i get this exception: Realm access from incorrect thread. Realm objects can only be accessed on the thread they were created.

I tried official solutions in Realm Github repo but none of them worked. Still get this exception.

I think i get this exception because i'm subscribing on an io thread. Realm instance gets created in Main thread. So i get this exception.

Are there any implementation offers?

Thanks.

Upvotes: 0

Views: 2213

Answers (2)

EpicPandaForce
EpicPandaForce

Reputation: 81539

LAST EDIT: technically, both solutions work, it's a question of as Viraj Tank said - "safe integration" vs "deep integration".

Still, the proper deep integration method would be to have separate download from the service, and a subscriber that listens to changes in the underlying Realm. (realmResults.asObservable().subscribe()).



I honestly can't help but feel like this is conceptually flawed.

First thing first, the Realm query is executed on the main thread at creation

@Override
public Observable<Page> search(@NonNull final String query) {

        return Realm.getDefaultInstance().where(Page.class)

Creating an instance of Realm that will never be closed.

Additionally, it uses asObservable() in conjunction with first(), which makes me wonder why you add change listeners to your results in the first place through asObservable() rather than just calling Observable.just(results).

Then, it seems like the remote data source obtains and adds the element to Realm and shows the downloaded item immediately, rather than the elements be supplied directly managed by Realm via change listener and thus providing auto-updates. In which case I'm not really sure what Realm is doing.

Anyways, my initial guess would be that you might be able to make your code work with the following lines

final Observable<Page> localResult = mSearchLocalDataSource.search(query)
                                                           .subscribeOn(AndroidSchedulers.mainThread());
final Observable<Page> remoteResult = mSearchRemoteDataSource.search(query)
            .subscribeOn(Schedulers.io())
            .doOnNext(new Action1<Page>() {
                @Override
                public void call(Page page) {
                    //mSearchLocalDataSource.save(query, page);
                    //mResultCache.put(query, page);
                }
            });

Considering you don't seem to be relying on Realm's auto-updating feature, you could consider using realm.copyFromRealm(obj) to create an unmanaged copy which can be passed between threads.


But in reality, for proper Realm usage, you should have two subscriptions - a single data flow from the network to the Realm; and a subscription for RealmResults<Page>.asObservable() which would notify you when the pages are written to underneath by the network observable - check out Christian Melchior's post for the idea.

Personally, I skipped the Realm observable because the RealmRecyclerViewAdapter handled it. So if you're showing multiple elements in a RecyclerView, then the Realm observable is not even needed, because the RealmRecylerViewAdapter manages its auto-updating through RealmChangeListener without relying on asObservable() to do it.



EDIT:

After forking the asker's repository as https://github.com/Zhuinden/wikilight , apparently I was right all along. The simple zero-copy solution would have been to add subscribeOn(AndroidSchedulers.mainThread()) for the local observable.

So surprisingly enough, not much changed.

    final Observable<Page> localResult = mSearchLocalDataSource.search(query).filter(new Func1<Page, Boolean>() {
        @Override
        public Boolean call(Page page) {
            return page != null;
        }
    }).subscribeOn(AndroidSchedulers.mainThread());
    final Observable<Page> remoteResult = mSearchRemoteDataSource.search(query).subscribeOn(Schedulers.io())
            .doOnNext(new Action1<Page>() {
                @Override
                public void call(Page page) {
                    if (page != null) {
                        mSearchLocalDataSource.save(query, page);
                      //  mResultCache.put(query, page);
                    }
                }
            });


    return Observable.concat(localResult, remoteResult)
            .first()
            .map(new Func1<Page, Page>() {
                @Override
                public Page call(Page page) {
                    if (page == null) {
                        throw new NoSuchElementException("No result found!");
                    }
                    return page;
                }
            });

But to be fair, the original solution seems to cut auto-updating out of the picture, so no RealmChangeListeners are used in the solution, and neither is RealmObject.asObservable(), so copyFromRealm() does make more sense. To make auto-updating work, this

@Override
public Observable<Page> search(@NonNull final String query) {
    return Observable.create(new Observable.OnSubscribe<Page>() {
        @Override
        public void call(Subscriber<? super Page> subscriber) {
            Realm realm = null;
            try {
                realm = mRealmManager.getRealm();

                final Page page = realm.where(Page.class).equalTo("query", query).findFirst();
                if(page != null && page.isLoaded() && page.isValid()) {
                    Log.i("data from", "realm");
                    subscriber.onNext(page);
                } else {
                    Log.i("data is", "empty");
                    Observable.empty();
                }
                subscriber.onCompleted();
            } finally {
                if(realm != null) {
                    mRealmManager.closeRealm(realm);
                }
            }
        }
    });
}

Should be replaced with this:

@Override
public Observable<Page> search(@NonNull final String query) {
    Realm realm = mRealmManager.getRealm(); // UI thread only!
    final Page page = realm.where(Page.class).equalTo("query", query).findFirst();
    if(page != null) {
        Log.i("data from", "realm");
        return page.asObservable();
    } else {
        Log.i("data is", "empty");
        return Observable.empty();
    }
}

In the end, some additional architecturing could make this even better, but I think I'll just go sleep.

Upvotes: 1

savepopulation
savepopulation

Reputation: 11921

After a long research i found the solution.

First let's remember the problem: When i subscribe a Schedulars.io thread and try to get data from realm or retrofit i get "Realm access from incorrect thread. Realm objects can only be accessed on the thread they were created"

Main problem in this case is i create Realm instance in Main thread but try to access it from worker thread.

We can subscribe realm on main thread but this's not a good practice. When i use

realm.where("query",query).findFirstAsync().asObservable();

as an example in Github repo i get stuck at

Observable.concat(localResult, remoteResult).first();

What's my solution?

In our Repository implementation still we have two observables for remote and local like below:

final Observable<Page> localResult = mSearchLocalDataSource.search(query);
final Observable<Page> remoteResult = mSearchRemoteDataSource.search(query)
                .doOnNext(new Action1<Page>() {
                    @Override
                    public void call(Page page) {
                        if (page != null) {
                            mSearchLocalDataSource.save(query, page);
                            mResultCache.put(query, page);
                        }
                    }
                });

Take attention on we save to data and cache in memory when we get data from remote.

Still we concat two observables if we cannot get data from cache.

return Observable.concat(localResult, remoteResult)
                .first()
                .map(new Func1<Page, Page>() {
                    @Override
                    public Page call(Page page) {
                        if (page == null) {
                            throw new NoSuchElementException("No result found!");
                        }
                        return page;
                    }
                });

With concat we try to get data from realm and if we can't we try to get from remote.

Here's remote observable implementation:

@Override
public Observable<Page> search(@NonNull String query) {
        return mWikiServices.search(query).flatMap(new Func1<Result, Observable<Page>>() {
            @Override
            public Observable<Page> call(Result result) {
                final ArrayList<Page> pages = new ArrayList<>(result.getQuery().getPages().values());
                Log.i("data from", "remote");
                return Observable.from(pages).first();
            }
        });
    }

Here's local source implementation:

@Override
public Observable<Page> search(@NonNull final String query) {
        return Observable.create(new Observable.OnSubscribe<Page>() {
            @Override
            public void call(Subscriber<? super Page> subscriber) {
                final Realm realm = Realm.getInstance(mRealmConfiguration);
                final Page page = realm.where(Page.class)
                        .equalTo("query", query)
                        .findFirst();
                if (page != null && page.isLoaded() && page.isValid()) {
                    Log.i("data from", "realm");
                    subscriber.onNext(realm.copyFromRealm(page));
                } else {
                    Observable.empty();
                }
                subscriber.onCompleted();
                realm.close();
            }
        });
    }

The point is i create a new Observable and get data from realm in there. So we create realm instance and use it in same thread. (io thread). We create copy of object to get rid of illegal state exception.

When we get data from realm if null we return an empty observable to do not get stuck in concat operation.

if we get page, it's valid and loaded we send to subscriber and complete operation.

Here how we can save the data we get from remote to realm:

@Override
public void save(@NonNull String query, @NonNull Page page) {
        final Realm realm = Realm.getInstance(mRealmConfiguration);
        realm.beginTransaction();
        final Page p = realm.createObject(Page.class);
        p.setQuery(query);
        p.setId(page.getId());
        p.setTitle(page.getTitle());
        p.setContent(page.getContent());
        realm.copyToRealmOrUpdate(p);
        realm.commitTransaction();
        realm.close();
    }

Here's example source code. https://github.com/savepopulation/wikilight

Good luck.

Upvotes: 1

Related Questions