kennyc
kennyc

Reputation: 5710

How do you apply a Combine operator only after the first message has been received?

In Combine, using only the built-in operators, is there a way to skip an operator on the first value but then apply that operator for all subsequent values?

Consider the following:

publisher
  .debounce(...)
  .sink(...)

In this arrangement, debounce will wait for the specified timeout to elapse before passing on the value to sink. However, there are many times when you only want debounce to kick-in after the first element. For example, if the user is trying to filter a list of contacts, it's very possible that they only enter one letter into a text field. If that's the case, the application should probably start filtering immediately, without having to wait for the debounce to timeout.

I'm aware of the Drop publishers, but I can't seem to find a combination of them that will perform more of a "skip" operation such that the sink receives every value, but the debounce is ignored on the first value.

Something like the following:

publisher
  .if_first_element_passthrough_to_sink(...), else_debounce(...)
  .sink(...)

Is something like this possible with the built-in operators?

Clarification

Some clarification since my original posting wasn't as clear as it should have been... The answer provided by Asperi below is very close, but ideally the first element in a sequence is always delivered, then debounce would kick in.

Imagine the user is typing the following:

A B C ... (pauses typing for a few seconds) ... D ... (pauses) ... E F G

What I would like is:

Upvotes: 7

Views: 3094

Answers (2)

rob mayoff
rob mayoff

Reputation: 386068

In your particular case of debounce, you might prefer the behavior of throttle. It sends the first element immediately, and then sends no more than one element per interval.

Anyway, can you do it with Combine built-ins? Yes, with some difficulty. Should you? Maybe…

Here's a marble diagram of your goal:

marble diagram of modified debounce operator

Each time a value goes into the kennyc-debouncer, it starts a timer (represented by a shaded region). If a value arrives while the timer is running, the kennyc-debouncer saves the value and restarts the timer. When the timer expires, if any values arrived while the timer was running, the kennyc-debouncer emits the latest value immediately.

The scan operator allows us to keep state that we mutate each time an input arrives. We need to send two kinds of inputs into scan: the outputs from the upstream publisher, and timer firings. So let's define a type for those inputs:

fileprivate enum DebounceEvent<Value> {
    case value(Value)
    case timerFired
}

What kind of state do we need inside our scan transform? We definitely need the scheduler, the interval, and the scheduler options, so that we can set timers.

We also need a PassthroughSubject we can use to turn timer firings into inputs to the scan operator.

We can't actually cancel and restart a timer, so instead, when the timer fires, we'll see whether it should have been restarted. If so, we'll start another timer. So we need to know whether the timer is running, and what output to send when the timer fires, and the restart time for the timer if restarting is necessary.

Since scan's output is the entire state value, we also need the state to include the output value to send downstream, if any.

Here's the state type:

fileprivate struct DebounceState<Value, S: Scheduler> {
    let scheduler: S
    let interval: S.SchedulerTimeType.Stride
    let options: S.SchedulerOptions?

    let subject = PassthroughSubject<Void, Never>()

    enum TimerState {
        case notRunning
        case running(PendingOutput?)

        struct PendingOutput {
            var value: Value
            var earliestDeliveryTime: S.SchedulerTimeType
        }
    }

    var output: Value? = nil
    var timerState: TimerState = .notRunning
}

Now let's look at how to actually use scan with some other operators to implement the kennyc version of debounce:

extension Publisher {
    func kennycDebounce<S: Scheduler>(
        for dueTime: S.SchedulerTimeType.Stride,
        scheduler: S,
        options: S.SchedulerOptions? = nil
    ) -> AnyPublisher<Output, Failure>
    {
        let initialState = DebounceState<Output, S>(
            scheduler: scheduler,
            interval: dueTime,
            options: options)
        let timerEvents = initialState.subject
            .map { _ in DebounceEvent<Output>.timerFired }
            .setFailureType(to: Failure.self)
        return self
            .map { DebounceEvent.value($0) }
            .merge(with: timerEvents)
            .scan(initialState) { $0.updated(with: $1) }
            .compactMap { $0.output }
            .eraseToAnyPublisher()
    }
}

We start by constructing the initial state for the scan operator.

Then, we create a publisher that turns the Void outputs of the state's PassthroughSubject into .timerFired events.

Finally, we construct our full pipeline, which has four stages:

  1. Turn the upstream outputs (from self) into .value events.

  2. Merge the value events with the timer events.

  3. Use scan to update the debouncing state with the value and timer events. The actual work is done in an updated(with:) method we'll add to DebounceState below.

  4. Map the full state down to just the value we want to pass downstream, and discard nulls (which happen when upstream events get suppressed by debouncing).

All that's left is to write the updated(with:) method. It looks at each incoming event's type (value or timerFired) and the state of the timer to decide what the new state should be and, if necessary, set a new timer.

extension DebounceState {
    func updated(with event: DebounceEvent<Value>) -> DebounceState<Value, S> {
        var answer = self
        switch (event, timerState) {
        case (.value(let value), .notRunning):
            answer.output = value
            answer.timerState = .running(nil)
            scheduler.schedule(after: scheduler.now.advanced(by: interval), tolerance: .zero, options: options) { [subject] in subject.send() }
        case (.value(let value), .running(_)):
            answer.output = nil
            answer.timerState = .running(.init(value: value, earliestDeliveryTime: scheduler.now.advanced(by: interval)))
        case (.timerFired, .running(nil)):
            answer.output = nil
            answer.timerState = .notRunning
        case (.timerFired, .running(.some(let pendingOutput))):
            let now = scheduler.now
            if pendingOutput.earliestDeliveryTime <= now {
                answer.output = pendingOutput.value
                answer.timerState = .notRunning
            } else {
                answer.output = nil
                scheduler.schedule(after: pendingOutput.earliestDeliveryTime, tolerance: .zero, options: options) { [subject] in subject.send() }
            }
        case (.timerFired, .notRunning):
            // Impossible!
            answer.output = nil
        }
        return answer
    }
}

Does it work? Let's test it:

import PlaygroundSupport
PlaygroundPage.current.needsIndefiniteExecution = true

let subject = PassthroughSubject<String, Never>()
let q = DispatchQueue.main
let start = DispatchTime.now()
let cfStart = CFAbsoluteTimeGetCurrent()
q.asyncAfter(deadline: start + .milliseconds(100)) { subject.send("A") }
// A should be delivered at start + 100ms.
q.asyncAfter(deadline: start + .milliseconds(200)) { subject.send("B") }
q.asyncAfter(deadline: start + .milliseconds(300)) { subject.send("C") }
// C should be delivered at start + 800ms.
q.asyncAfter(deadline: start + .milliseconds(1100)) { subject.send("D") }
// D should be delivered at start + 1100ms.
q.asyncAfter(deadline: start + .milliseconds(1800)) { subject.send("E") }
// E should be delivered at start + 1800ms.
q.asyncAfter(deadline: start + .milliseconds(1900)) { subject.send("F") }
q.asyncAfter(deadline: start + .milliseconds(2000)) { subject.send("G") }
// G should be delivered at start + 2500ms.

let ticket = subject
    .kennycDebounce(for: .milliseconds(500), scheduler: q)
    .sink {
        print("\($0) \(((CFAbsoluteTimeGetCurrent() - cfStart) * 1000).rounded())") }

Output:

A 107.0
C 847.0
D 1167.0
E 1915.0
G 2714.0

I'm not sure why the later events are so delayed. It could just be playground side effects.

Upvotes: 4

Asperi
Asperi

Reputation: 258621

If I correctly understood your needs it can be achieved based on Concatenate as like the following (in pseudo-code):

let originalPublisher = ...
let publisher = Publishers.Concatenate(
        prefix: originalPublisher.first(),
        suffix: originalPublisher.debounce(for: 0.5, scheduler: RunLoop.main))
    .eraseToAnyPublisher()

so, prefix just sends first element downstream from original publisher and finished, afterwards suffix just pass all following elements using debounce.

Upvotes: 7

Related Questions