Reputation: 823
Is there a way to have the publisher emit a value only to the latest subscriber/observer?
An example for that would be; a manager class that can be subscribed to by multiple observers. When an event occurs, I would like only the latest subscriber to be observed. As far as I know, there is no way for the publisher to keep track of its subscribers but my knowledge regarding Combine and reactive programming is limited so I am unsure if this is possible in the first place.
Upvotes: 2
Views: 2260
Reputation: 1461
Use combineLatest(_:) when you want the downstream subscriber to receive a tuple of the most-recent element from multiple publishers when any of them emit a value. To pair elements from multiple publishers, use zip(_:) instead. To receive just the most-recent element from multiple publishers rather than tuples, use merge(with:).
Upvotes: 0
Reputation: 31
Had a similar problem. Here is my Solution:
II use the publisher in a service and depending on the view it is subscribed via the service
final class SingleSubscriberSubject<Output, Failure>: Publisher where Failure: Error {
typealias Output = Output
typealias Failure = Failure
private var lastValue: Output?
private var lastSubscriber: AnySubscriber<Output, Failure>?
func receive<S>(subscriber: S) where S: Subscriber, Failure == S.Failure, Output == S.Input {
if lastSubscriber != nil {
lastValue = nil
}
lastSubscriber = AnySubscriber(subscriber)
let subscription = Subscription(
subscriber: subscriber,
lastValue: lastValue
) { [weak self] in
self?.lastSubscriber = nil
self?.lastValue = nil
}
subscriber.receive(subscription: subscription)
}
func send(_ value: Output?) {
lastValue = value
guard
let lastSubscriber = lastSubscriber,
let value = value
else {
return
}
_ = lastSubscriber.receive(value)
}
}
and the Subscription
private extension SingleSubscriberSubject {
final class Subscription<S: Subscriber> {
private var subscriber: S?
private var lastValue: S.Input?
private let onCancel: () -> Void
init(subscriber: S, lastValue: S.Input?, onCancel: @escaping () -> Void) {
self.subscriber = subscriber
self.lastValue = lastValue
self.onCancel = onCancel
}
}
}
extension SingleSubscriberSubject.Subscription: Cancellable {
func cancel() {
subscriber = nil
onCancel()
}
}
extension SingleSubscriberSubject.Subscription: Subscription {
func request(_ demand: Subscribers.Demand) {
guard let subscriber = subscriber, demand > 0 else { return }
if let lastValue = lastValue {
_ = subscriber.receive(lastValue)
self.lastValue = nil
}
}
}
Gist is available here
Upvotes: 0
Reputation: 113
You are right. Unfortunately, there is no way to list/track subscribers of a publisher. To solve your problem, you have to implement a custom publisher. There are two possibilities here. Either you implement a custom publisher with the Publisher
protocol, but Apple advises against this (see here), or you create a custom publisher with already existing types, as Apple recommends. I have prepared an example for the second option.
The logic is very simple. We create a publisher with a PassthroughSubject
inside (it can also be a CurrentValueSubject
). Then we implement the methods typical of a PassthroughSubject
and use them to overwrite the same methods of the PassthroughSubject
, which is inside our class. In the sink
method we store all returning subscriptions BUT before we add a new subscription to the Set
, we go through all the already cached subscriptions and cancel them. This way we achieve the goal that only the last subscription works.
// The subscriptions will be cached in the publisher.
// To avoid strong references, I use the WeakBox recommendation from the Swift forum.
struct WeakBox<T: AnyObject & Hashable>: Hashable {
weak var item: T?
func hash(into hasher: inout Hasher) {
hasher.combine(item)
}
}
class MyPublisher<T, E: Error> {
private let subject = PassthroughSubject<T, E>()
private var subscriptions = Set<WeakBox<AnyCancellable>>()
deinit {
subscriptions.removeAll()
}
public func send(_ input: T) {
subject.send(input)
}
public func send(completion: Subscribers.Completion<E>) {
subject.send(completion: completion)
}
public func sink(receiveCompletion receivedCompletion: @escaping (Subscribers.Completion<E>) -> Void, receiveValue receivedValue: @escaping (T) -> Void) -> AnyCancellable {
let subscription = subject
.sink(receiveCompletion: { completion in
receivedCompletion(completion)
}, receiveValue: { value in
receivedValue(value)
})
// Cancel previous subscriptions.
subscriptions.forEach { $0.item?.cancel() }
// Add new subscription.
subscriptions.insert(WeakBox(item: subscription))
return subscription
}
}
I tested the class in Playground as follows.
let publisher = MyPublisher<Int, Never>()
let firstSubscription = publisher
.sink(receiveCompletion: { completion in
print("1st subscription completion \(completion)")
}, receiveValue: { value in
print("1st subscription value \(value)")
})
let secondSubscription = publisher
.sink(receiveCompletion: { completion in
print("2st subscription completion \(completion)")
}, receiveValue: { value in
print("2st subscription value \(value)")
})
let thirdSubscription = publisher
.sink(receiveCompletion: { completion in
print("3st subscription completion \(completion)")
}, receiveValue: { value in
print("3st subscription value \(value)")
})
publisher.send(123)
Console output:
3st subscription value 123
If you comment out the line subscriptions.forEach { $0.cancel() }
, then you get:
3st subscription value 123
1st subscription value 123
2st subscription value 123
Hopefully I could help you.
Upvotes: 1