user3452758
user3452758

Reputation: 638

How to "merge" observables at run time?

My use case for this is click listeners in a view group:

I want each child view to be subscribed to when added to the composite observable, but only if the composite has a subscription. If not, the child should be subscribed to only when the composite is subscribed to.

So far, my solution is:

PublishSubject<ViewClick> composite;
Map<View, Subscription> subscriptions;

void addSource(View view, Observable<Click> clicks) {
    Subscription s = clicks.map(click -> new ViewClick(view, click)
     .subscribe(composite::onNext);
    subscriptions.put(view, s);
}

void removeSource(View v) {
    Subscription s = subscriptions.get(v);  
    s.unsubscribe;
    subscriptions.remove(v);  
}

Observable<ViewClick> compositeClicks() {
    return composite;
}

However, this means the clicks observables are no longer cold, as they're subscribed regardless of whether anything's subscribed to compositeClicks.

Is there a better way?

Upvotes: 3

Views: 766

Answers (2)

Gluck
Gluck

Reputation: 2962

Your difficulties come from the fact that addSource/removeSource aren't reactive, and will get away if you make them into Rx streams.

Several ways to do that:

  • IObservable<IObservable<View>> where inner observables will have a single onNext upon activation/add, and will onComplete upon removal.
  • IObservable<View> x2 (one for add, one for removal)
  • IObservable<ViewActivationEvent> (tuple with add/remove kind + view instance)

(assuming your click stream can be extracted from the view, otherwise you need to provide a tuple view+clickStream in place of the view, for activation part)

Here's how implementing your stream will look like with #2 above, but all ways can work:

Subject<View> add,remove;

clicks = add.flatMap(view -> getClickObservable(view).takeUntil(remove.where(v => v == view)))

Upvotes: 0

akarnokd
akarnokd

Reputation: 69997

You need a special "warm" Subject such as BufferUntilSubscriber that holds onto the source Observables until a single consumer subscribes to it. Unfortunately, that class isn't part of the official API, however, 2.0 will have an official UnicastSubject for the very same purpose:

UnicastSubject<Observable<Integer>> inputs = UnicastSubject.create();

Observable<String> p = inputs
        .flatMap(v -> v.map(u -> u.toString()))
        .publish()
        .autoConnect()
        ;

inputs.onNext(Observable.just(1)
    .doOnSubscribe(s -> System.out.println("Subscribed to 1")));
inputs.onNext(Observable.just(2)
    .doOnSubscribe(s -> System.out.println("Subscribed to 2")));
inputs.onNext(Observable.just(3)
    .doOnSubscribe(s -> System.out.println("Subscribed to 3")));

System.out.println("Subscribing to p");

p.subscribe(System.out::println);

Upvotes: 1

Related Questions