pfandrade
pfandrade

Reputation: 2419

Combine reset after operator?

Is there a way I can have Combine operator that resets that automatically sends a nil downstream after a set amount of time?

Example:

let pub = PassthroughSubject<Int, Never>()

pub
    .reset(after: .seconds(5), scheduler: RunLoop.main) // how to build this?
    .sink { v in
    print("Received \(v ?? "nil")")
}

pub.send(1)
pub.send(2)
Thread.sleep(forTimeInterval: 6.0)
pub.send(3)

// Expected prints:
//   "Received 1"
//   "Received 2"
//   "Received nil"
//   "Received 3"

Upvotes: 0

Views: 217

Answers (1)

pfandrade
pfandrade

Reputation: 2419

I ended up figuring this out myself:

import Combine

private struct Reset<Upstream: Publisher, S: Scheduler>: Publisher {
    typealias Output = Upstream.Output?
    typealias Failure = Upstream.Failure
    
    private let upstream: Upstream
    private let interval: S.SchedulerTimeType.Stride
    private let scheduler: S
    
    init(upstream: Upstream, after interval: S.SchedulerTimeType.Stride, scheduler: S) {
        self.upstream = upstream
        self.interval = interval
        self.scheduler = scheduler
    }
    
    func receive<S: Subscriber>(subscriber: S) where Failure == S.Failure, Output == S.Input {
        upstream
            .map { value in
                let timeout = Just<Output>(nil)
                    .delay(for: interval, scheduler: scheduler)
                return Just<Output>(value)
                    .merge(with: timeout)
                    .eraseToAnyPublisher()
            }
            .switchToLatest()
            .subscribe(subscriber)
    }
}

public extension Publisher {
    func reset<S: Scheduler>(after interval: S.SchedulerTimeType.Stride, scheduler: S) -> AnyPublisher<Self.Output?, Self.Failure> {
        Reset(upstream: self, after: interval, scheduler: scheduler).eraseToAnyPublisher()
    }
}

Understanding switchToLatest was key here.

To sum it up, whenever I receive a value from upstream I setup up a new publisher that will publish nil after the specified delay. Using switchToLatest this publisher will get canceled if a new value comes along before the timeout elapses. If not, nil is sent downstream.

Upvotes: 1

Related Questions