Reputation: 638
My use case for this is click listeners in a view group:
Each View
publishes a cold Observable<Click>
API.
A view group manages a collection of views, and provides an Observable<ViewClick>
API that emits clicks from child views.
I want each child view to be subscribed to when added to the composite observable, but only if the composite has a subscription. If not, the child should be subscribed to only when the composite is subscribed to.
So far, my solution is:
PublishSubject<ViewClick> composite;
Map<View, Subscription> subscriptions;
void addSource(View view, Observable<Click> clicks) {
Subscription s = clicks.map(click -> new ViewClick(view, click)
.subscribe(composite::onNext);
subscriptions.put(view, s);
}
void removeSource(View v) {
Subscription s = subscriptions.get(v);
s.unsubscribe;
subscriptions.remove(v);
}
Observable<ViewClick> compositeClicks() {
return composite;
}
However, this means the clicks
observables are no longer cold, as they're subscribed regardless of whether anything's subscribed to compositeClicks
.
Is there a better way?
Upvotes: 3
Views: 766
Reputation: 2962
Your difficulties come from the fact that addSource/removeSource aren't reactive, and will get away if you make them into Rx streams.
Several ways to do that:
IObservable<IObservable<View>>
where inner observables will have a single onNext
upon activation/add, and will onComplete
upon removal.IObservable<View>
x2 (one for add, one for removal)IObservable<ViewActivationEvent>
(tuple with add/remove kind + view instance)(assuming your click stream can be extracted from the view, otherwise you need to provide a tuple view+clickStream in place of the view, for activation part)
Here's how implementing your stream will look like with #2 above, but all ways can work:
Subject<View> add,remove;
clicks = add.flatMap(view -> getClickObservable(view).takeUntil(remove.where(v => v == view)))
Upvotes: 0
Reputation: 69997
You need a special "warm" Subject such as BufferUntilSubscriber
that holds onto the source Observables until a single consumer subscribes to it. Unfortunately, that class isn't part of the official API, however, 2.0 will have an official UnicastSubject
for the very same purpose:
UnicastSubject<Observable<Integer>> inputs = UnicastSubject.create();
Observable<String> p = inputs
.flatMap(v -> v.map(u -> u.toString()))
.publish()
.autoConnect()
;
inputs.onNext(Observable.just(1)
.doOnSubscribe(s -> System.out.println("Subscribed to 1")));
inputs.onNext(Observable.just(2)
.doOnSubscribe(s -> System.out.println("Subscribed to 2")));
inputs.onNext(Observable.just(3)
.doOnSubscribe(s -> System.out.println("Subscribed to 3")));
System.out.println("Subscribing to p");
p.subscribe(System.out::println);
Upvotes: 1