Reputation: 2419
Is there a way I can have Combine operator that resets that automatically sends a nil
downstream after a set amount of time?
Example:
let pub = PassthroughSubject<Int, Never>()
pub
.reset(after: .seconds(5), scheduler: RunLoop.main) // how to build this?
.sink { v in
print("Received \(v ?? "nil")")
}
pub.send(1)
pub.send(2)
Thread.sleep(forTimeInterval: 6.0)
pub.send(3)
// Expected prints:
// "Received 1"
// "Received 2"
// "Received nil"
// "Received 3"
Upvotes: 0
Views: 217
Reputation: 2419
I ended up figuring this out myself:
import Combine
private struct Reset<Upstream: Publisher, S: Scheduler>: Publisher {
typealias Output = Upstream.Output?
typealias Failure = Upstream.Failure
private let upstream: Upstream
private let interval: S.SchedulerTimeType.Stride
private let scheduler: S
init(upstream: Upstream, after interval: S.SchedulerTimeType.Stride, scheduler: S) {
self.upstream = upstream
self.interval = interval
self.scheduler = scheduler
}
func receive<S: Subscriber>(subscriber: S) where Failure == S.Failure, Output == S.Input {
upstream
.map { value in
let timeout = Just<Output>(nil)
.delay(for: interval, scheduler: scheduler)
return Just<Output>(value)
.merge(with: timeout)
.eraseToAnyPublisher()
}
.switchToLatest()
.subscribe(subscriber)
}
}
public extension Publisher {
func reset<S: Scheduler>(after interval: S.SchedulerTimeType.Stride, scheduler: S) -> AnyPublisher<Self.Output?, Self.Failure> {
Reset(upstream: self, after: interval, scheduler: scheduler).eraseToAnyPublisher()
}
}
Understanding switchToLatest
was key here.
To sum it up, whenever I receive a value from upstream I setup up a new publisher that will publish nil
after the specified delay. Using switchToLatest
this publisher will get canceled if a new value comes along before the timeout elapses. If not, nil
is sent downstream.
Upvotes: 1