Matej Drobnič
Matej Drobnič

Reputation: 1191

How to properly transform multicasting observables in RxJava

Let's say I have a event-emitting data source that I want to transform into reactive stream. Data source is bound by a resource (for example a socket that periodically sends updated state) so I would want to share single Subscription to that resource. Using single observable with replay (for new subscribers to immediately get current value value) and refCount operators seems to be well suited for that. For example this his how MyDataProvider singleton would look like:

private final Observable<MyData> myDataObservable = Observable.<MyData>create(emitter -> {
    // Open my resource here and emit data into observable
})
    .doOnDispose(() -> {
        // Close my resource here
    })
    .replay(1)
    .refCount();

public Observable<MyData> getMyDataObservable() {
    return myDataObservable;
}

However, now let's say I have another data source that needs result of the first data source to compute its own value:

private final Observable<AnotherData> anotherDataObservable = getMyDataProvider().getMyDataObservable()
    .flatMap(myData -> {
        // Call another data source and return the result here
    })

public Observable<AnotherData> getAnotherDataObservable() {
    return anotherDataObservable;
}

Here my setup starts to fall apart. Multicasting of the first observable only works until refCount operator. After that, everything is unicast again. That would mean that if two separate subscriptions to anotherDataProvider are made, flatMap operator would be called twice. I see two workarounds for this, but I dislike both:

1. Transform first observable before multicast happens

Simplest workaround seems to be for me to save unicast variant of myDataObservable somewhere, before multicast operation is made and then perform that multicast operation in anotherDataObservable However if those two observables are located in diferent modules, this workaround would make the code very inelegant, requiring MyDataProvider to expose two different observables that seemingly return same data.

2. Just use duplicate multicast operators

Second workaround seems to be to just apply those replay and refCount operators again in anotherDataObservable. But this creates inefficiency since first multicast operator in myDataObservable is already applied, but now does nothing, except waste memory and CPU cycles.

Both workarounds also involve coupling of the AnotherDataProvider to the MyDataProvider. If in the future MyDataProvider changes and multicasting is no longer desired, I would also have to update AnotherDataProvider to remove multicasting operators from there.

What would be the more elegant way to resolve this problem? Could I have architectured that any better to avoid the issue altogether?

Upvotes: 7

Views: 1192

Answers (3)

Rafael Guillen
Rafael Guillen

Reputation: 1673

You can split the unicast and multicast streams, but it is redundant. I think the second approach is better, and by the way, replay and refcount operators actually do things and are not a waste.

You are converting the Observable of myDataObservable into a ConenctableObservable when you call replay(1) enabling multicast.
Then you subscribe to it internally while using refcount(), that also provides a single point for subsequent subscriptions; after this point all is unicast again.

What you really want to achieve in anotherDataObservable is the same, so, do exactly the same as in myDataObservable.

Upvotes: 0

TpoM6oH
TpoM6oH

Reputation: 8585

About your first approach, in the current setup, your anotherDataObservable uses myDataObservable and as I understand they are logically coupled because they use the same source. So you would need to have some base shared logic for them. I would extract it to a common module, that will expose unicast version of the observable and then make myDataObservable and anotherDataObservable use it in different modules each adding multicast logic.

Another option would be to have a class that will monitor your resource by subscribing to it like in myDataObservable, doing the processing in onNext and publishing the mapped result with a Subject, i.e. BehavioralSubject if you want to always have access to last published value, and the raw result with another subject. The clients will subscribe to that subjects and will get the mapped or raw values that were calculated only once in the monitoring class.

P.S. remember to add backpressure strategy to your Subject before you subscribe to it.

If those options do not suit you, think about if it is really important to avoid calling flatMap multiple times? Your code is quite straightforward and it is an important metric. If flatMap is not heavy you can just have it run multiple times.

Upvotes: 3

katharmoi
katharmoi

Reputation: 134

You can use "publish().refCount()" tandem to allow sharing of a single Subscriber. As they are used very often they have an alias share().

You can also use a ConnectableObservable. But be careful while using replay with ConnectableObservables.

If you apply the Replay operator to an Observable before you convert it into a connectable Observable, the resulting connectable Observable will always emit the same complete sequence to any future observers, even those observers that subscribe after the connectable Observable has begun to emit items to other subscribed observers. As documents state:

Upvotes: 1

Related Questions