Reputation: 12467
I have a source Observable<Void>
that:
BroadcastReceivers
when its subscribed toBroadcastReceivers
twice)How do I accomplish this?
Upvotes: 1
Views: 219
Reputation: 12467
The .share() operator
Observable<Void> src = ...;
Observable<Void> shareable = src
.doOnSubscribe(() -> {...})
.doOnUnsubscribe(() -> {...})
.share() //<------------ or publish.refcount(1)
Note: There's a race condition here for additional subscribers. if you are expecting to receive an event when you subscribe initially for every subscriber, use .replayingShare()
from https://github.com/JakeWharton/RxReplayingShare - this caches the latest value while still unsubscribing from the source when it has no subscribers.
Upvotes: 1
Reputation: 12087
When you want to do subscriber-specific things defer
is a really useful operator:
Observable<Integer> source = Observable.just(1, 2, 3);
// control subscribers via this atomic boolean
AtomicBoolean subscribed = new AtomicBoolean(false);
Observable<Integer> o = Observable.defer(() -> {
if (subscribed.compareAndSet(false, true)) {
return source.doOnUnsubscribe(() -> subscribed.set(false));
} else {
return Observable.never();
}
});
//Let's test it!
TestSubscriber<Integer> ts = TestSubscriber.create(0);
o.subscribe(ts);
ts.requestMore(1);
ts.assertValue(1);
ts.assertNoTerminalEvent();
TestSubscriber<Integer> ts2 = TestSubscriber.create();
o.subscribe(ts2);
ts2.assertNoValues();
ts2.assertNoTerminalEvent();
ts.requestMore(1);
ts.assertValues(1, 2);
ts.assertNoTerminalEvent();
ts.unsubscribe();
TestSubscriber<Integer> ts3 = TestSubscriber.create();
o.subscribe(ts3);
ts3.assertValues(1, 2, 3);
ts3.assertCompleted();
Upvotes: 0
Reputation: 16142
Not a guaranteed solution, but maybe this:
Observable<Void> src = ...;
Observable<Void> shareable = src
.doOnSubscribe(() -> {...})
.doOnUnsubscribe(() -> {...})
.publish()
.autoConnect();
Then you subscribe to the shareable
Observable. Please test first!
Upvotes: 0