j2emanue
j2emanue

Reputation: 62519

RxJava -Do subscribers only run once

I have some confusion on subscribers and when they react to observers. Lets say i have the following simple observer with a subscriber that does an action:

Observable.just(preferences.getBoolean(C"vibrate", false))
            .subscribeOn(Schedulers.io())//observe on new thread
            .observeOn(AndroidSchedulers.mainThread()) //subscribe(listen) on main thread
            .subscribe(new Action1<Boolean>() {
                @Override
                public void call(Boolean shouldVibrate) {
                    if (shouldVibrate)
                        Toast.makeText(context,"i should vibrate now",Toast.SHORT).show();
                }
            });

I realize the observer gets called right away when this code is first seen. But what if the shared preference is changed again afterwards, will this code run again automatically or does it only run everytime i call subscribe ? What if i wanted it to run everytime the shared preference was altered (sort of like a watcher).

Upvotes: 1

Views: 4366

Answers (1)

lopar
lopar

Reputation: 2442

It really depends on the observable. I would suggest reading "Hot" and "Cold" Observables on the reactive Observable docs.

In your case, this is a Cold observable. It will resubscribe each time it is subscribed to. However, you only subscribe to it once. Your code snippet will actually block on the preferences fetch (probably not a huge problem), but it will only emit one preference.

I would suggest using the ContentObservable class in the RxAndroid extension lib for RxJava, which you are already using (because of AndroidSchedulers).

It would look something like this (This is back-of-napkin code, I have not compiled or ran this):

// Defer the observable so it gets a fresh preference value. Also, we'll
// be using it a few times.
final Observable<Boolean> vibratePreference = Observable.defer(
    new Func0<Observable<Boolean>>() {

      @Override
      public Observable<Boolean> call() {
        return Observable.just(preferences.getBoolean("vibrate", false));
      }

    });

vibratePreference
    .concatWith(ContentObservable.fromSharedPreferencesChanges(preferences)
        // Only consider changes to the vibrate preference.
        .filter(new Func1<String, Boolean>() {

          @Override
          public Boolean call(final String key) {
            return "vibrate".equals(key);
          }

        })
        // Each time the preference changes, get the latest value.
        .flatMap(new Func1<String, Observable<Boolean>>() {

          @Override
          public Observable<Boolean>(final String unusedKey) {
            return vibratePreference;
          }

        }))
    .scheduleOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe( /* ...and so on. */ );

Also, if you are doing this on an activity or a fragment, I would strongly suggest looking into bindActivity and bindFragment in AppObservable in RxAndroid to make sure you are binding this observable to the lifecycle. You also may want to store a CompositeSubscription that you can empty in onPause and restore subscriptions in onResume. Those are slightly off-topic but will most likely be useful very soon.

Upvotes: 2

Related Questions