Tarek
Tarek

Reputation: 823

How to have a publisher emit only to the last subscriber in Combine

Is there a way to have the publisher emit a value only to the latest subscriber/observer?

An example for that would be; a manager class that can be subscribed to by multiple observers. When an event occurs, I would like only the latest subscriber to be observed. As far as I know, there is no way for the publisher to keep track of its subscribers but my knowledge regarding Combine and reactive programming is limited so I am unsure if this is possible in the first place.

Upvotes: 2

Views: 2260

Answers (3)

Nikolai Nagornyi
Nikolai Nagornyi

Reputation: 1461

Use combineLatest(_:) when you want the downstream subscriber to receive a tuple of the most-recent element from multiple publishers when any of them emit a value. To pair elements from multiple publishers, use zip(_:) instead. To receive just the most-recent element from multiple publishers rather than tuples, use merge(with:).

Upvotes: 0

alex_shepard
alex_shepard

Reputation: 31

Had a similar problem. Here is my Solution:

  • When a new subscriber is added, the previous subscriber is cancelled and the last value is set to nil.
  • When sending a new value, only the current subscriber will receive the value.

II use the publisher in a service and depending on the view it is subscribed via the service

final class SingleSubscriberSubject<Output, Failure>: Publisher where Failure: Error {
    typealias Output = Output
    typealias Failure = Failure

    private var lastValue: Output?
    private var lastSubscriber: AnySubscriber<Output, Failure>?

    func receive<S>(subscriber: S) where S: Subscriber, Failure == S.Failure, Output == S.Input {
        if lastSubscriber != nil {
            lastValue = nil
        }
        lastSubscriber = AnySubscriber(subscriber)

        let subscription = Subscription(
            subscriber: subscriber,
            lastValue: lastValue
        ) { [weak self] in
            self?.lastSubscriber = nil
            self?.lastValue = nil
        }

        subscriber.receive(subscription: subscription)
    }

    func send(_ value: Output?) {
        lastValue = value

        guard
            let lastSubscriber = lastSubscriber,
            let value = value
        else {
            return
        }

        _ = lastSubscriber.receive(value)
    }
}

and the Subscription

private extension SingleSubscriberSubject {
    final class Subscription<S: Subscriber> {
        private var subscriber: S?
        private var lastValue: S.Input?
        private let onCancel: () -> Void

        init(subscriber: S, lastValue: S.Input?, onCancel: @escaping () -> Void) {
            self.subscriber = subscriber
            self.lastValue = lastValue
            self.onCancel = onCancel
        }
    }
}

extension SingleSubscriberSubject.Subscription: Cancellable {
    func cancel() {
        subscriber = nil
        onCancel()
    }
}

extension SingleSubscriberSubject.Subscription: Subscription {
    func request(_ demand: Subscribers.Demand) {
        guard let subscriber = subscriber, demand > 0 else { return }

        if let lastValue = lastValue {
            _ = subscriber.receive(lastValue)
            self.lastValue = nil
        }
    }
}

Gist is available here

Upvotes: 0

dennis_ka
dennis_ka

Reputation: 113

You are right. Unfortunately, there is no way to list/track subscribers of a publisher. To solve your problem, you have to implement a custom publisher. There are two possibilities here. Either you implement a custom publisher with the Publisher protocol, but Apple advises against this (see here), or you create a custom publisher with already existing types, as Apple recommends. I have prepared an example for the second option.

The logic is very simple. We create a publisher with a PassthroughSubject inside (it can also be a CurrentValueSubject). Then we implement the methods typical of a PassthroughSubject and use them to overwrite the same methods of the PassthroughSubject, which is inside our class. In the sink method we store all returning subscriptions BUT before we add a new subscription to the Set, we go through all the already cached subscriptions and cancel them. This way we achieve the goal that only the last subscription works.


// The subscriptions will be cached in the publisher.
// To avoid strong references, I use the WeakBox recommendation from the Swift forum.
struct WeakBox<T: AnyObject & Hashable>: Hashable {
    weak var item: T?

    func hash(into hasher: inout Hasher) {
        hasher.combine(item)
    }
}

class MyPublisher<T, E: Error> {
    private let subject = PassthroughSubject<T, E>()
    private var subscriptions = Set<WeakBox<AnyCancellable>>()
    
    deinit {
        subscriptions.removeAll()
    }
    
    public func send(_ input: T) {
        subject.send(input)
    }
    
    public func send(completion: Subscribers.Completion<E>) {
        subject.send(completion: completion)
    }
    
    public func sink(receiveCompletion receivedCompletion: @escaping (Subscribers.Completion<E>) -> Void, receiveValue receivedValue: @escaping (T) -> Void) -> AnyCancellable {
        let subscription = subject
            .sink(receiveCompletion: { completion in
                receivedCompletion(completion)
            }, receiveValue: { value in
                receivedValue(value)
            })
        
        // Cancel previous subscriptions.
        subscriptions.forEach { $0.item?.cancel() }
        
        // Add new subscription.
        subscriptions.insert(WeakBox(item: subscription))
        
        return subscription
    }
}

I tested the class in Playground as follows.

let publisher = MyPublisher<Int, Never>()

let firstSubscription = publisher
    .sink(receiveCompletion: { completion in
        print("1st subscription completion \(completion)")
    }, receiveValue: { value in
        print("1st subscription value \(value)")
    })

let secondSubscription = publisher
    .sink(receiveCompletion: { completion in
        print("2st subscription completion \(completion)")
    }, receiveValue: { value in
        print("2st subscription value \(value)")
    })

let thirdSubscription = publisher
    .sink(receiveCompletion: { completion in
        print("3st subscription completion \(completion)")
    }, receiveValue: { value in
        print("3st subscription value \(value)")
    })

publisher.send(123)

Console output:

3st subscription value 123

If you comment out the line subscriptions.forEach { $0.cancel() }, then you get:

3st subscription value 123
1st subscription value 123
2st subscription value 123

Hopefully I could help you.

Upvotes: 1

Related Questions