Reputation: 582
I have a SignalProducer, ProducerA, that emits values in various intervals. I am trying to collect the latest N values that the SignalProducer emits and create a new producer, ProducerB, that emits an array containing the latest N values.
ProducerB should start emitting values when ProducerA emits the first N values, and then emit a new array each time ProducerA emits a new value.
Can someone help me?
Upvotes: 3
Views: 406
Reputation: 694
extension SignalProducer {
func latestValues(count: Int) -> SignalProducer<[Value], Error> {
SignalProducer<[Value], Error> { (observer: Signal<[Value], Error>.Observer, lifetime: Lifetime) in
var collectedValues: [Value] = []
let disposable = self
.start(Signal<Value, Error>.Observer(value: { value in
collectedValues.append(value)
if collectedValues.count > count {
collectedValues.removeFirst(collectedValues.count - count)
}
if collectedValues.count == count {
observer.send(value: collectedValues)
}
},
failed: { error in
observer.send(error: error)
},
completed: {
observer.sendCompleted()
},
interrupted: {
observer.sendInterrupted()
}))
lifetime.observeEnded {
disposable.dispose()
}
}
}
}
It emits an array containing the latest N values.
Test:
let valuesGenerator = SignalProducer([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
.promoteError(Error.self)
valuesGenerator
.latestValues(count: 3)
.start(Signal.Observer(value: { value in
print("valuesGenerator, value: \(value)")
},
failed: { error in
print("valuesGenerator, error: \(error)")
},
completed: {
print("valuesGenerator, completed")
},
interrupted: {
print("valuesGenerator, interrupted")
}))
Output:
valuesGenerator, value: [1, 2, 3]
valuesGenerator, value: [2, 3, 4]
valuesGenerator, value: [3, 4, 5]
valuesGenerator, value: [4, 5, 6]
valuesGenerator, value: [5, 6, 7]
valuesGenerator, value: [6, 7, 8]
valuesGenerator, value: [7, 8, 9]
valuesGenerator, value: [8, 9, 10]
valuesGenerator, completed
Upvotes: 0
Reputation: 6377
let (producerA, observerA) = SignalProducer<Int, NoError>.buffer(5)
let n = 3
producerA.take(n).collect()
.takeUntilReplacement(producerA.skip(n).map { [$0] })
.scan([], { $0.suffix(n - 1) + $1 })
.startWithNext {
print($0)
}
observerA.sendNext(1) // nothing printed
observerA.sendNext(2) // nothing printed
observerA.sendNext(3) // prints [1, 2, 3]
observerA.sendNext(4) // prints [2, 3, 4]
observerA.sendNext(5) // prints [3, 4, 5]
Upvotes: 0
Reputation: 582
I came up with this code
extension SignalProducer {
/// Creates a new producer that emits an array that contains the latest N values that were emitted
/// by the original producer as specified in 'capacity'.
@warn_unused_result(message="Did you forget to call `start` on the producer?")
public func latestValues(n:Int) -> SignalProducer<[Value], Error> {
var array: [Value] = []
return self.map {
value in
array.append(value)
if array.count >= n {
array.removeFirst(array.count - n)
}
return array
}
.filter {
$0.count == n
}
}
}
Upvotes: 0