ZakTaccardi
ZakTaccardi

Reputation: 12467

How to have subscription side effects of source observable trigger only when the subscriber count changes from 0 to 1 or from 1 to 0

I have a source Observable<Void> that:

How do I accomplish this?

Upvotes: 1

Views: 219

Answers (3)

ZakTaccardi
ZakTaccardi

Reputation: 12467

The .share() operator

Observable<Void> src = ...;
Observable<Void> shareable = src
    .doOnSubscribe(() -> {...})
    .doOnUnsubscribe(() -> {...})
    .share()  //<------------ or publish.refcount(1)

Note: There's a race condition here for additional subscribers. if you are expecting to receive an event when you subscribe initially for every subscriber, use .replayingShare() from https://github.com/JakeWharton/RxReplayingShare - this caches the latest value while still unsubscribing from the source when it has no subscribers.

Upvotes: 1

Dave Moten
Dave Moten

Reputation: 12087

When you want to do subscriber-specific things defer is a really useful operator:

Observable<Integer> source = Observable.just(1, 2, 3);

// control subscribers via this atomic boolean
AtomicBoolean subscribed = new AtomicBoolean(false);
Observable<Integer> o = Observable.defer(() -> {
    if (subscribed.compareAndSet(false, true)) {
        return source.doOnUnsubscribe(() -> subscribed.set(false));
    } else {
        return Observable.never();
    }
});

//Let's test it!

TestSubscriber<Integer> ts = TestSubscriber.create(0);
o.subscribe(ts);
ts.requestMore(1);
ts.assertValue(1);
ts.assertNoTerminalEvent();

TestSubscriber<Integer> ts2 = TestSubscriber.create();
o.subscribe(ts2);
ts2.assertNoValues();
ts2.assertNoTerminalEvent();
ts.requestMore(1);
ts.assertValues(1, 2);
ts.assertNoTerminalEvent();

ts.unsubscribe();
TestSubscriber<Integer> ts3 = TestSubscriber.create();
o.subscribe(ts3);
ts3.assertValues(1, 2, 3);
ts3.assertCompleted();

Upvotes: 0

Tassos Bassoukos
Tassos Bassoukos

Reputation: 16142

Not a guaranteed solution, but maybe this:

Observable<Void> src = ...;
Observable<Void> shareable = src
    .doOnSubscribe(() -> {...})
    .doOnUnsubscribe(() -> {...})
    .publish()
    .autoConnect();

Then you subscribe to the shareable Observable. Please test first!

Upvotes: 0

Related Questions