elnino
elnino

Reputation: 245

Realm - reactive extension RxJava2

I am using Realm in conjunction with RxJava2. Must the instance of Realm be closed by programmer or is closed automatically?

I have a problems with this chain:

 return Observable.merge(
      getEvents
        .toObservable()
      ,
      mChangeEventsNotification
        .flatMapMaybe(notification ->
          getEvents
          .firstElement()
        )
    )

Realm throws java.lang.IllegalStateException: This Realm instance has already been closed, making it unusable.

Implementation of getEvents is same as getList().

I watched implementation RealmObservableFactory.

Can operator firstElement close Realm Instance for first Observable of merge operator?

Realm instance is shared between all Observers within one thread?

It looks as bug of Realm version 4.3.3. When I downgrade to version 4.1.0 everythink is ok. Maybe problem of ref count.

Upvotes: 0

Views: 130

Answers (1)

EpicPandaForce
EpicPandaForce

Reputation: 81539

To answer your question, I'll ask the source code for the Rx integration:

public Flowable<RealmResults<E>> asFlowable() {
    if (realm instanceof Realm) {
        return realm.configuration.getRxFactory().from((Realm) realm, this);

Where RxObservableFactoryis by default RealmObservableFactory, which does:

@Override
public <E> Flowable<RealmResults<E>> from(final Realm realm, final RealmResults<E> results) {
    final RealmConfiguration realmConfig = realm.getConfiguration();
    return Flowable.create(new FlowableOnSubscribe<RealmResults<E>>() {
        @Override
        public void subscribe(final FlowableEmitter<RealmResults<E>> emitter) throws Exception {
            // Gets instance to make sure that the Realm is open for as long as the
            // Observable is subscribed to it.
            final Realm observableRealm = Realm.getInstance(realmConfig);
            resultsRefs.get().acquireReference(results);
            final RealmChangeListener<RealmResults<E>> listener = new RealmChangeListener<RealmResults<E>>() {
                @Override
                public void onChange(RealmResults<E> results) {
                    if (!emitter.isCancelled()) {
                        emitter.onNext(results);
                    }
                }
            };
            results.addChangeListener(listener);

            // Cleanup when stream is disposed
            emitter.setDisposable(Disposables.fromRunnable(new Runnable() {
                @Override
                public void run() {
                    results.removeChangeListener(listener);
                    observableRealm.close();
                    resultsRefs.get().releaseReference(results);
                }
            }));

            // Emit current value immediately
            emitter.onNext(results);

        }
    }, BACK_PRESSURE_STRATEGY);
}

In which case what happens is that the Realm instance of the RealmResults you called asFlowable() on is used to obtain its RealmConfiguration, and the ref count is increased on the thread where the Flowable is created (which is the UI thread or a handler thread).

This means that even the following scenario would work:

public Flowable<List<MyObject>> getList() {
    try(Realm realm = Realm.getDefaultInstance()) {
        return realm.where(MyObject.class)
                    .findAllAsync()
                    .asFlowable()
                    .filter(RealmResults::isLoaded);
    } // auto-close
}

And the Realm associated with this Flowable would stay open for as long as you don't unsubscribe from the Flowable as well.


To answer your question, yes, you do need to close the Realm instance you obtain the RealmResults from somewhere.

Upvotes: 1

Related Questions