nalexn
nalexn

Reputation: 10791

RxSwift: share() alternative that guarantees single subscription on the upstream

I've always thought .share(replay: 1, scope: .forever) shares the single upstream subscription no matter how many downstream subscribers there are.

However, I've just discovered that if the count of the downstream subscriptions drops to zero, it stops "sharing" and releases the subscription on the upstream (because refCount() is used under the hood). So when a new downstream subscription happens, it has to re-subscribe on the upstream. In the following example:

let sut = Observable<Int>
    .create { promise in
        print("create")
        promise.onNext(0)
        return Disposables.create()
    }
    .share(replay: 1, scope: .forever)

sut.subscribe().dispose()
sut.subscribe().dispose()

I would expect create to be printed just once, but it gets printed twice. And if I remove .dispose() calls - just once.

How do I set up the chain where the upstream is guaranteed to be subscribed at most once?

Upvotes: 3

Views: 1140

Answers (3)

nalexn
nalexn

Reputation: 10791

I didn't like the leaking Disposable in the suggested solutions so came up with the following:

extension ObservableType {
    func shareReplayForever() -> Observable<Element> {
        let relay = BehaviorRelay<Element?>(value: nil)
        let disposeBag = DisposeBag()
        var subscribeOnce: () -> Void = {
            self.bind(to: relay).disposed(by: disposeBag)
        }
        return relay
            .compactMap { $0 }
            .do(onSubscribe: {
                subscribeOnce()
                subscribeOnce = { }
            }, onDispose: {
                _ = disposeBag
            })
    }
}

The trick is to capture the disposeBag in a downstream closure onDispose. As long as any code is holding a reference to the downstream observable (thus being capable to subscribe in the future), the disposeBag stays alive. However, it does get disposed when all the downstream observables are deallocated (no one down the stream can subscribe anymore - we can release the upstream)

Upvotes: 0

Daniel T.
Daniel T.

Reputation: 33967

The goal you describe implies you should be using multicast (or one of operators that use it, like publish(), replay(_:) or replayAll()) and not share...

let sut = Observable<Int>
    .create { observer in
        print("create")
        observer.onNext(0)
        return Disposables.create()
    }
    .replay(1)

let disposable = sut.connect() // subscription will stay alive until dispose() is called on this disposable...

sut.debug("one").subscribe().dispose()
sut.debug("two").subscribe().dispose()

To understand the difference between .forever and .whileConnected, read the documentation in the "ShareReplayScope.swift" file. Both are refcounted, but the difference is in how re-subscription operators are handled. Here is some test code to show the difference...

class SandboxTests: XCTestCase {
    var scheduler: TestScheduler!
    var observable: Observable<String>!

    override func setUp() {
        super.setUp()
        scheduler = TestScheduler(initialClock: 0)
        // creates an observable that will error on the first subscription, then call `.onNext("A")` on the second.
        observable = scheduler.createObservable(timeline: "-#-A")
    }

    func testWhileConnected() {
        // this shows that re-subscription gets through the while connected share to the source observable
        let result = scheduler.start { [observable] in
            observable!
                .share(scope: .whileConnected)
                .retry(2)
        }
        XCTAssertEqual(result.events, [
            .next(202, "A")
        ])
    }

    func testForever() {
        // however re-subscription doesn't get through on a forever share
        let result = scheduler.start { [observable] in
            observable!
                .share(scope: .forever)
                .retry(2)
        }
        XCTAssertEqual(result.events, [
            .error(201, NSError(domain: "Test Domain", code: -1))
        ])
    }
}

Upvotes: 1

Fabio Felici
Fabio Felici

Reputation: 2906

I am not sure why .share(replay: 1, scope: .forever) is not giving the behaviour you want (I also think it should work like you describe) but what about this other way without share?

// You will subscribe to this and not directly on sut (maybe hiding Subject interface to avoid onNext calls from observers)
let subject = ReplaySubject<Int>.create(bufferSize: 1)

let sut = Observable<Int>.create { obs in
  print("Performing work ...")
  obs.onNext(0)
  return Disposables.create()
}

// This subscription is hidden, happens only once and stays alive forever
sut.subscribe(subject)

// Observers subscribe to the public stream
subject.subscribe().dispose()
subject.subscribe().dispose()

Upvotes: 0

Related Questions