Tudor Luca
Tudor Luca

Reputation: 6419

Reuse RxJava stream from a remote API

I have an API call and I want to wrap it using Observable:

private Observable<RealmResults<Account>> getAccounts() {
        final Observable<RealmResults<Account>> realmAccounts =
                Observable.defer(new Func0<Observable<RealmResults<Account>>>() {
                    @Override
                    public Observable<RealmResults<Account>> call() {
                        return RealmObservable.results(getActivity(), new Func1<Realm, RealmResults<Account>>() {
                            @Override
                            public RealmResults<Account> call(Realm realm) {
                                return realm.where(Account.class).findAll();
                            }
                        });
                    }
                });

        return Observable
                .create(new Observable.OnSubscribe<RealmResults<Account>>() {
                    @Override
                    public void call(final Subscriber<? super RealmResults<Account>> subscriber) {
                        DataBridge.getAccounts(Preferences.getString(Constant.ME_GUID, ""), new OnResponseListener() {
                            @Override
                            public void OnSuccess(Object data) {
                                Log.d("Stream", "onSuccess");
                                realmAccounts.subscribe(subscriber);
                            }

                            @Override
                            public void onFailure(Object data) {
                                subscriber.onError(new Exception(data.toString()));
                            }
                        });
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .startWith(realmAccounts);
    }

and I use it like

Observable<Accounts> accounts = getAccounts().flatMap(
                new Func1<RealmResults<Account>, Observable<Account>>() {
                    @Override
                    public Observable<Account> call(RealmResults<Account> accounts) {
                        return Observable.from(accounts);
                    }
                });

How can I use the accounts observable multiple times without calling the API each time. I need to process the stream of accounts and extract different sets of data out of it.

Upvotes: 2

Views: 682

Answers (1)

Marek Hawrylczak
Marek Hawrylczak

Reputation: 2504

The easiest method is to use operator cache, which internally uses ReplaySubject. It cache the source observable items and then serve the results from cache.

... 
Observable<<RealmResults<Account>> cachedResult = getAccounts().cache();
Observable<Accounts> accountsObservable = cachedResult.flatMap(...);
Observable<X> xObservable = cachedResult.flatMap(...);

If you would like to avoid caching results you should use Connectable Observables. Usually it only does matter for Hot Observables. Connectable observable does not begin emitting items until its Connect method is called. You can use publish operator to convert to Connectable Observable.

ConnectableObservable<<RealmResults<Account>> connectebleObservable = getAccounts().publish();
Observable<Accounts> accountsObservable = connectebleObservable .flatMap(...);
Observable<X> xObservable = connectebleObservable .flatMap(...);
//You must subscribe before connect
accountsObservable.subsribe(...);
xObservable.subscribe(...);
//start emiting data
connectebleObservable.connect();

The important catch here is that you must subscribe before connect - to avoid data loss - otherwise you must use replay operator, which is similar to cache operator, but used for connectable observable

And what about share ?

It create ConnectableObservable and exposes it as regular Observable. First subscription automatically causes connection and emission.

Share used in your case, without replay may cause data loss or multiple executions depending on timing. for example for 2 subscribers and one item int the stream you may have fallowing cases:

  1. 2 subscriptions created before onNext - works as expected.
  2. second subscription created after onNext but before onComplete - second subscription gets only onComplete
  3. second subscriptinon created after onComplete - 2 executions wihtout caching

Upvotes: 3

Related Questions