Reputation: 5710
In Combine
, using only the built-in operators, is there a way to skip an operator on the first value but then apply that operator for all subsequent values?
Consider the following:
publisher
.debounce(...)
.sink(...)
In this arrangement, debounce
will wait for the specified timeout to elapse before passing on the value to sink
. However, there are many times when you only want debounce
to kick-in after the first element. For example, if the user is trying to filter a list of contacts, it's very possible that they only enter one letter into a text field. If that's the case, the application should probably start filtering immediately, without having to wait for the debounce
to timeout.
I'm aware of the Drop
publishers, but I can't seem to find a combination of them that will perform more of a "skip" operation such that the sink
receives every value, but the debounce
is ignored on the first value.
Something like the following:
publisher
.if_first_element_passthrough_to_sink(...), else_debounce(...)
.sink(...)
Is something like this possible with the built-in operators?
Clarification
Some clarification since my original posting wasn't as clear as it should have been... The answer provided by Asperi below is very close, but ideally the first element in a sequence is always delivered, then debounce
would kick in.
Imagine the user is typing the following:
A B C ... (pauses typing for a few seconds) ... D ... (pauses) ... E F G
What I would like is:
A
, D
and E
are delivered immediately.B C
is coalesced into just C
using debounce
F G
is coalesced into just G
using debounce
Upvotes: 7
Views: 3094
Reputation: 386068
In your particular case of debounce
, you might prefer the behavior of throttle
. It sends the first element immediately, and then sends no more than one element per interval
.
Anyway, can you do it with Combine built-ins? Yes, with some difficulty. Should you? Maybe…
Here's a marble diagram of your goal:
Each time a value goes into the kennyc-debouncer, it starts a timer (represented by a shaded region). If a value arrives while the timer is running, the kennyc-debouncer saves the value and restarts the timer. When the timer expires, if any values arrived while the timer was running, the kennyc-debouncer emits the latest value immediately.
The scan
operator allows us to keep state that we mutate each time an input arrives. We need to send two kinds of inputs into scan
: the outputs from the upstream publisher, and timer firings. So let's define a type for those inputs:
fileprivate enum DebounceEvent<Value> {
case value(Value)
case timerFired
}
What kind of state do we need inside our scan
transform? We definitely need the scheduler, the interval, and the scheduler options, so that we can set timers.
We also need a PassthroughSubject
we can use to turn timer firings into inputs to the scan
operator.
We can't actually cancel and restart a timer, so instead, when the timer fires, we'll see whether it should have been restarted. If so, we'll start another timer. So we need to know whether the timer is running, and what output to send when the timer fires, and the restart time for the timer if restarting is necessary.
Since scan
's output is the entire state value, we also need the state to include the output value to send downstream, if any.
Here's the state type:
fileprivate struct DebounceState<Value, S: Scheduler> {
let scheduler: S
let interval: S.SchedulerTimeType.Stride
let options: S.SchedulerOptions?
let subject = PassthroughSubject<Void, Never>()
enum TimerState {
case notRunning
case running(PendingOutput?)
struct PendingOutput {
var value: Value
var earliestDeliveryTime: S.SchedulerTimeType
}
}
var output: Value? = nil
var timerState: TimerState = .notRunning
}
Now let's look at how to actually use scan
with some other operators to implement the kennyc version of debounce:
extension Publisher {
func kennycDebounce<S: Scheduler>(
for dueTime: S.SchedulerTimeType.Stride,
scheduler: S,
options: S.SchedulerOptions? = nil
) -> AnyPublisher<Output, Failure>
{
let initialState = DebounceState<Output, S>(
scheduler: scheduler,
interval: dueTime,
options: options)
let timerEvents = initialState.subject
.map { _ in DebounceEvent<Output>.timerFired }
.setFailureType(to: Failure.self)
return self
.map { DebounceEvent.value($0) }
.merge(with: timerEvents)
.scan(initialState) { $0.updated(with: $1) }
.compactMap { $0.output }
.eraseToAnyPublisher()
}
}
We start by constructing the initial state for the scan
operator.
Then, we create a publisher that turns the Void
outputs of the state's PassthroughSubject
into .timerFired
events.
Finally, we construct our full pipeline, which has four stages:
Turn the upstream outputs (from self
) into .value
events.
Merge the value events with the timer events.
Use scan
to update the debouncing state with the value and timer events. The actual work is done in an updated(with:)
method we'll add to DebounceState
below.
Map the full state down to just the value we want to pass downstream, and discard nulls (which happen when upstream events get suppressed by debouncing).
All that's left is to write the updated(with:)
method. It looks at each incoming event's type (value
or timerFired
) and the state of the timer to decide what the new state should be and, if necessary, set a new timer.
extension DebounceState {
func updated(with event: DebounceEvent<Value>) -> DebounceState<Value, S> {
var answer = self
switch (event, timerState) {
case (.value(let value), .notRunning):
answer.output = value
answer.timerState = .running(nil)
scheduler.schedule(after: scheduler.now.advanced(by: interval), tolerance: .zero, options: options) { [subject] in subject.send() }
case (.value(let value), .running(_)):
answer.output = nil
answer.timerState = .running(.init(value: value, earliestDeliveryTime: scheduler.now.advanced(by: interval)))
case (.timerFired, .running(nil)):
answer.output = nil
answer.timerState = .notRunning
case (.timerFired, .running(.some(let pendingOutput))):
let now = scheduler.now
if pendingOutput.earliestDeliveryTime <= now {
answer.output = pendingOutput.value
answer.timerState = .notRunning
} else {
answer.output = nil
scheduler.schedule(after: pendingOutput.earliestDeliveryTime, tolerance: .zero, options: options) { [subject] in subject.send() }
}
case (.timerFired, .notRunning):
// Impossible!
answer.output = nil
}
return answer
}
}
Does it work? Let's test it:
import PlaygroundSupport
PlaygroundPage.current.needsIndefiniteExecution = true
let subject = PassthroughSubject<String, Never>()
let q = DispatchQueue.main
let start = DispatchTime.now()
let cfStart = CFAbsoluteTimeGetCurrent()
q.asyncAfter(deadline: start + .milliseconds(100)) { subject.send("A") }
// A should be delivered at start + 100ms.
q.asyncAfter(deadline: start + .milliseconds(200)) { subject.send("B") }
q.asyncAfter(deadline: start + .milliseconds(300)) { subject.send("C") }
// C should be delivered at start + 800ms.
q.asyncAfter(deadline: start + .milliseconds(1100)) { subject.send("D") }
// D should be delivered at start + 1100ms.
q.asyncAfter(deadline: start + .milliseconds(1800)) { subject.send("E") }
// E should be delivered at start + 1800ms.
q.asyncAfter(deadline: start + .milliseconds(1900)) { subject.send("F") }
q.asyncAfter(deadline: start + .milliseconds(2000)) { subject.send("G") }
// G should be delivered at start + 2500ms.
let ticket = subject
.kennycDebounce(for: .milliseconds(500), scheduler: q)
.sink {
print("\($0) \(((CFAbsoluteTimeGetCurrent() - cfStart) * 1000).rounded())") }
Output:
A 107.0
C 847.0
D 1167.0
E 1915.0
G 2714.0
I'm not sure why the later events are so delayed. It could just be playground side effects.
Upvotes: 4
Reputation: 258621
If I correctly understood your needs it can be achieved based on Concatenate
as like the following (in pseudo-code):
let originalPublisher = ...
let publisher = Publishers.Concatenate(
prefix: originalPublisher.first(),
suffix: originalPublisher.debounce(for: 0.5, scheduler: RunLoop.main))
.eraseToAnyPublisher()
so, prefix just sends first element downstream from original publisher and finished, afterwards suffix just pass all following elements using debounce
.
Upvotes: 7