Reputation: 49620
I'm trying to compose a publisher, that, when a subsequent subscriber connects to it, emits the last emitted value immediately to that subscriber, and then continues to emit future values to all connected subscribers.
Q: Is there a way to do this without writing a publisher from scratch, and only by composing built-in publishers? I might be missing something here.
So, if we had a publisher counting every second:
let counter = Timer.publish(every: 1, on: .main, in: .common)
.autoconnect()
.scan(0, { v, _ in v + 1 })
let sharedPublisher = // ??? something with counter publisher above
sharedPublisher.sink { print("A: ", $0 }.store(in: &bag)
// after 2.5 seconds
sharedPublisher.sink { print("B: ", $0 }.store(in: &bag)
The output would be:
A: 1 // at t=1 sec
A: 2 // at t=2
B: 2 // at t=2.5
A: 3 // at t=3
B: 3 // at t=3
Initially, naively, I thought I could just use a .share
and a .buffer
:
let sharedPublisher = counter
.share()
.buffer(size: 1, prefetch: .byRequest, whenFull: .dropOldest)
but of course, that doesn't work since buffer
only buffers if the downstream isn't ready to accept values, which isn't the case here.
Maybe something with Record
/Record.Recording
?
Upvotes: 1
Views: 3596
Reputation: 536047
A publisher that can be shared out (because it's a class, not a struct) and that immediately hands to any new subscriber its most recent value is a CurrentValueSubject. So, for example, this is not quite what you were doing, but it demonstrates the point:
import UIKit
import Combine
func delay(_ delay:Double, closure:@escaping ()->()) {
let when = DispatchTime.now() + delay
DispatchQueue.main.asyncAfter(deadline: when, execute: closure)
}
class ViewController: UIViewController {
var timer : Timer?
let sub = CurrentValueSubject<Int,Never>(0)
var storage = Set<AnyCancellable>()
override func viewDidLoad() {
super.viewDidLoad()
self.timer = Timer.scheduledTimer(withTimeInterval: 1, repeats: true) { _ in
self.sub.value += 1
}
delay(0.5) {
self.sub.sink { print("A:", $0) }.store(in: &self.storage)
}
delay(2.5) {
self.sub.sink { print("B:", $0) }.store(in: &self.storage)
}
}
}
That outputs
A: 0
A: 1
A: 2
B: 2
A: 3
B: 3
which seems to be exactly the effect you're after.
The cool thing is that a Subject can also be an operator (via a publisher's .subscribe
method) so it can exist downstream of something else while maintaining this behavior. The only trick is that you have to retain the Subject (which you can do because the .subscribe
call yields an AnyCancellable). So now I can do it your way, i.e. starting with a timer publisher:
import UIKit
import Combine
func delay(_ delay:Double, closure:@escaping ()->()) {
let when = DispatchTime.now() + delay
DispatchQueue.main.asyncAfter(deadline: when, execute: closure)
}
class ViewController: UIViewController {
var storage = Set<AnyCancellable>()
override func viewDidLoad() {
super.viewDidLoad()
let counter = Timer.publish(every: 1, on: .main, in: .common)
.autoconnect()
.scan(0, { v, _ in v + 1 })
let sub = CurrentValueSubject<Int,Never>(0)
counter.subscribe(sub).store(in: &self.storage)
delay(0.5) {
sub.sink { print("A:", $0) }.store(in: &self.storage)
}
delay(2.5) {
sub.sink { print("B:", $0) }.store(in: &self.storage)
}
}
}
Upvotes: 4