Andy Dennie
Andy Dennie

Reputation: 6062

Best practices for exposing "expensive" Observables in RxJava

I'm new to RxJava and trying to determine common idioms and best practices.

Say I've got a Foo class that emits Bars (incomplete and oversimplified for the moment):

class Foo {
    public Subscriber barSubscriber;
    public Observable<Bar> getBarObservable = (...details omitted...)

    private void someMethod() {
        // emit a Bar
        barSubscriber.onNext(bar); 
    }
}

Other objects that want to subscribe to those Bars do so by calling

foo.getBarObservable().subscribe(...);

Let's say that producing and emitting Bars is "expensive". To avoid doing this when there are no more subscribers, Foo's getBarObservable could expose a connectable, ref-counted Observable like so (using share()):

class Foo {
    private Subscriber barSubscriber;
    private Observable<Bar> barObservable =  Observable.create(
            new Observable.OnSubscribe<Bar>() {
                @Override
                public void call(Subscriber<? super Bar> subscriber) {
                    Foo.this.subscriber = subscriber;
                    new Thread(new Runnable() {
                        @Override
                        public void run() {
                            runUntilUnsubscribed();
                        }
                    }).start();

                }
            }
    ).share();

    public Observable<Bar> getBarObservable() {
        return barObservable;
    }

    public void runUntilUnsubscribed(} {
        while(!subscriber.isUnsubscribed()) {

            /* do some heavy stuff that produces a Bar.  If, when a 
               Bar is ready, we still have subscribers, emit the Bar */

            if (!subscriber.isUnsubscribed()) 
                subscriber.onNext(bar);
        }
    }
}

Most of the examples and tutorials I've seen create Observables inline on-the-fly in the same chunk of code that subscribes to them, so it's not clear to me what the standard practices are in the more real-world scenario where the creation of the Observable and the subscription to it are in two different places.

  1. For a class like Foo that doesn't want to know who its subscribers will be or how many subscribers it will have, is this the right approach?
  2. It seems to me that this would be a very typical scenario - is it? Or, at a high level, is this not the right way to think about exposing Observables? Are there drawbacks to using this approach routinely?
  3. It seems to me I need that little if (subscriber == null && !subscriber.isUnsubscribed()) subscriber.onNext(bar); pattern every time I want to emit a Bar. Is that also a common idiom, or is there a better way? Nevermind, I don't need the null check..., not sure what I was thinking there.

Upvotes: 3

Views: 2635

Answers (2)

akarnokd
akarnokd

Reputation: 69997

Your example class can't really work: setBar can throw NPE if subscriber is null, the runUntilUnsubscribed references a missing bar field/value and is a busy loop that would emit the same value over and over.

You say creating a Bar is expensive, but its creation seems to be outside the Foo class and I guess you'd want to dispatch such value to the currently subscribed Subscribers. That is what PublishSubject is for:

class Foo {
    final PublishSubject<Bar> subject = PublishSubject.create();
    public void setBar(Bar bar) {
        subject.onNext(bar);
    }
    public Observable<Bar> getBarObservable() {
        return subject; // .asObservable() if you want to hide the subject
    }
}

If there aren't any subscribers, the bar set will just fall out and get garbage collected. If you'd like to retain the last value, use BehaviorSubject instead of a PublishSubject.

Otherwise, if you need to trigger the creation of the expensive Bar values when the subscriber arrives, you can use some jump-starting sequence with share():

Observable.just(1)
.subscribeOn(Schedulers.computation())
.map(v -> createBar())
.share();

But the use of share() really depends on the intended lifecycle of each Bar value.

For example, if you want to store the bar until subscribers arrive, then do the heavy computations once and dispatch the results, you can have the following construct:

class Foo {
    final BehaviorSubject<Bar> subject = BehaviorSubject.create();
    final Observable<Bar> output = subject
        .observeOn(Schedulers.computation())
        .doOnNext(bar -> expensiveInplaceComputation(bar))
        .take(1)
        .share();

    public void setBar(Bar bar) {
        subject.onNext(bar);
    }
    public Observable<Bar> getBarObservable() {
        return output;
    }
}

See this gist for a runnable example.

Upvotes: 3

Andr&#233; Staltz
Andr&#233; Staltz

Reputation: 13994

  1. Yes it is roughly the correct approach. If bar in Foo needs to be shared to all subscribers, then use .publish().refCount() (or share() as you said). If not, then use a common Observable, which is by default "cold".

  2. Exposing Observables is a common scenario. In a good reactive architecture most classes have only getters of Observables, because setters are inherently not reactive. Given a program or a class that works with setters, you can usually convert it to Observables and getters without affecting functionality. Observables and getters are a desirable approach because of some inversion of control. With setters, if Foo sets a value in Baz, you need to look at class Foo whenever you want to understand Baz. But with Observables and getters, Baz gets from Foo and Baz defines itself how it works, and Foo can be oblivious of Baz.

  3. I haven't ever needed to use that if pattern. Also I rarely need Observable.create(). There are many Observable creation helpers (from, interval, range, just, to name a few) and Observable transformations (such as the all-powerful flatMap) that allow you to get very far in expressing new Observables. Also Subjects allow you to manually create Observables on the go.

Upvotes: 3

Related Questions