drekka
drekka

Reputation: 21883

Combine: Is this a good technique for a custom publisher?

I'm playing about with writing a custom Combine publisher in order to better understand how I can turn various classes into them. Admittedly this is not something I want to do a lot, I just want to understand how it could be done if I need to.

The scenario I'm working with is where I have a class that generates values over time and potentially has multiple subscribers listening. It's not a case of the publisher generating values when requested, but pushing values when it desires. This might occur (for example) when reading text, or with random input from a UI.

To test this out I've started with a simple integer generator that's something like this:

class IntPublisher {
                
    func generate() {
        DispatchQueue.global(qos: .background).async { [weak self] in
            self?.send(0)
            self?.send(1)
            self?.send(2)
            self?.complete()
        }
    }
        
    private func send(_ value: Int) {
        queueOnMain()
    }
        
    func queueOnMain() {
        Thread.sleep(forTimeInterval: 0.5)
        DispatchQueue.main.async { /* ... */ }
    }
}

And here's the generator as a Publisher and Subscription:

class IntPublisher: Publisher {
    
    typealias Output = Int
    typealias Failure = Never
    
    class Subscription: Combine.Subscription, Equatable {
        
        private var subscriber: AnySubscriber<Int, Never>?
        private var didFinish: ((Subscription) -> Void)?
        
        init<S>(subscriber: S, didFinish:@escaping (Subscription) -> Void) where S: Subscriber, S.Input == Output, S.Failure == Failure {
            self.subscriber = AnySubscriber(subscriber)
            self.didFinish = didFinish
        }
        
        func request(_ demand: Subscribers.Demand) {
        }
        
        func cancel() {
            finish()
        }
        
        func complete() {
            self.subscriber?.receive(completion: .finished)
            finish()
        }
        
        func finish() {
            didFinish?(self)
            subscriber = nil
            didFinish = nil
        }
        
        func send(_ value: Int) {
            _ = subscriber?.receive(value)
        }
        
        static func == (lhs: PublisherTests.IntPublisher.Subscription, rhs: PublisherTests.IntPublisher.Subscription) -> Bool {
            return lhs.subscriber?.combineIdentifier == rhs.subscriber?.combineIdentifier
        }
    }
    
    var subscriptions = [Subscription]()
    
    func receive<S>(subscriber: S) where S : Subscriber, S.Failure == Failure, S.Input == Output {
        
        let subscription = Subscription(subscriber: subscriber) { [weak self] (subscription) in
            self?.subscriptions.remove(subscription)
        }
        
        subscriptions.append(subscription)
        subscriber.receive(subscription: subscription)
    }
    
    func generate() {
        DispatchQueue.global(qos: .background).async { [weak self] in
            self?.send(0)
            self?.send(1)
            self?.send(2)
            self?.complete()
        }
    }
    
    private func send(_ value: Int) {
        queueOnMain { $0.send(value) }
    }
    
    private func complete() {
        queueOnMain { $0.complete() }
    }
    
    func queueOnMain(_ block: @escaping (Subscription) -> Void) {
        Thread.sleep(forTimeInterval: 0.5)
        DispatchQueue.main.async { self.subscriptions.forEach { block($0) } }
    }
}

My question revolves around the way I've had to track the subscriptions in the publisher. Because it's generating the values and needs to forward them to the subscriptions, I've had to setup an array and store the subscriptions within it. In turn I had to find a way for the subscriptions to remove themselves from the publisher's array when they're cancelled or completed because the array effective forms a circular reference between the publisher and subscription.

In all the blogs I've read on custom publishing, they all cover the scenario where a publisher is waiting around for subscribers to request values. The publisher doesn't need to store a reference to the subscriptions because it passes closures which they can call to get a value. My use case is different because the publisher controls the request, not the subscribers.

So my question is this - Is using an array a good way to handle this? or is there something in Combine I've missed?

Upvotes: 2

Views: 2554

Answers (2)

meomeomeo
meomeomeo

Reputation: 942

As Apple suggest for Creating Your Own Publishers. You should use Use a concrete subclass of Subject, a CurrentValueSubject, or @Published

For example:

func operation() -> AnyPublisher<String, Error> {
    let subject = PassthroughSubject<String, Error>()
    subject.send("A")
    subject.send("B")
    subject.send("C")
    return subject.eraseToAnyPublisher()
}

Upvotes: 2

drekka
drekka

Reputation: 21883

New Dev's idea has made a massive reduction in the code. I don't know why I didn't think of it ... oh wait, I do. I was so focused on implementing I clean forgot to consider the option of using a decorator pattern around a subject.

Anyway, here's the (much simpler) code:

class IntPublisher2: Publisher {
    
    typealias Output = Int
    typealias Failure = Never
    private let passThroughSubject = PassthroughSubject<Output, Failure>()
            
    func receive<S>(subscriber: S) where S : Subscriber, S.Failure == Failure, S.Input == Output {
        passThroughSubject.receive(subscriber: subscriber)
    }
    
    func generate() {
        DispatchQueue.global(qos: .background).async {
            Thread.sleep(forTimeInterval: 0.5)
            self.passThroughSubject.send(0)
            Thread.sleep(forTimeInterval: 0.5)
            self.passThroughSubject.send(1)
            Thread.sleep(forTimeInterval: 0.5)
            self.passThroughSubject.send(2)
            Thread.sleep(forTimeInterval: 0.5)
            self.passThroughSubject.send(completion: .finished)
        }
    }
}

Upvotes: 1

Related Questions