Dr. Mr. Uncle
Dr. Mr. Uncle

Reputation: 604

How do I cancel a combine subscription within a sink?

I have a somewhat complicated architecture for a feature in my app.

Sample code is below. My original expectation was that this would only print once, because I call cancellableSet.removeAll(). But this actually ends up being called twice, which creates problems in my application.

How do I get this so it only fires what's in the sink after the subscription is stored in the cancellable set.

Note that I have a few restrictions here that I'll mention. My sample code is just simplifying this.

import Combine

enum State {
    case loggedOut
    case doingSomething
}

let aState = CurrentValueSubject<State, Never>(.doingSomething)
private var cancellableSet: Set<AnyCancellable> = []

func logUserOut() {
    cancellableSet.removeAll()
    aState.send(.loggedOut)
}

func doSomethingElse() { }
aState.sink { newState in
    print("numberOfSubscriptions is: \(cancellableSet.count)")
    switch newState {
    case .loggedOut:
        doSomethingElse()
    case .doingSomething:
        logUserOut()
    }
    
}
.store(in: &cancellableSet)

Upvotes: 3

Views: 2456

Answers (2)

rob mayoff
rob mayoff

Reputation: 385580

The problem in your code is that the subscription starts delivering values synchronously before the call to sink returns, and so before the call to store even begins.

One way to solve this is to turn aState into a ConnectablePublisher before subscribing. A ConnectablePublisher doesn't publish until its connect method is called. So call connect after store returns.

You can use the makeConnectable method on any Publisher whose Failure == Never to wrap it in a ConnectablePublisher.

let connectable = aState.makeConnectable()
connectable.sink { newState in
    print("numberOfSubscriptions is: \(cancellableSet.count)")
    switch newState {
    case .loggedOut:
        doSomethingElse()
    case .doingSomething:
        logUserOut()
    }
}
.store(in: &cancellableSet)
connectable.connect()

Upvotes: 2

Sean Goudarzi
Sean Goudarzi

Reputation: 1544

If the queue this code is being run on is a serial one, then maybe you can move the execution of the code inside the sink to the end of the queue. This way, the program will find the time to store the subscription in the set.

aState.sink { newState in
    DispatchQueue.main.async { // or whatever other queue you are running on
        print("numberOfSubscriptions is: \(cancellableSet.count)")
        switch newState {
        case .loggedOut:
            doSomethingElse()
        case .doingSomething:
            logUserOut()
        }
    }
}
.store(in: &cancellableSet)

It's a bit dirty tho.

Upvotes: 1

Related Questions