gkaimakas
gkaimakas

Reputation: 582

ReactiveCocoa - SignalProducer that emits the latest N values in array

I have a SignalProducer, ProducerA, that emits values in various intervals. I am trying to collect the latest N values that the SignalProducer emits and create a new producer, ProducerB, that emits an array containing the latest N values.

ProducerB should start emitting values when ProducerA emits the first N values, and then emit a new array each time ProducerA emits a new value.

Can someone help me?

Upvotes: 3

Views: 406

Answers (3)

Alexey Golikov
Alexey Golikov

Reputation: 694

extension SignalProducer {
    func latestValues(count: Int) -> SignalProducer<[Value], Error> {
        SignalProducer<[Value], Error> { (observer: Signal<[Value], Error>.Observer, lifetime: Lifetime) in
            var collectedValues: [Value] = []
            
            let disposable = self
                .start(Signal<Value, Error>.Observer(value: { value in
                    collectedValues.append(value)
                    
                    if collectedValues.count > count {
                        collectedValues.removeFirst(collectedValues.count - count)
                    }
                    
                    if collectedValues.count == count {
                        observer.send(value: collectedValues)
                    }
                },
                                                     failed: { error in
                    observer.send(error: error)
                },
                                                     completed: {
                    observer.sendCompleted()
                },
                                                     interrupted: {
                    observer.sendInterrupted()
                }))
            
            lifetime.observeEnded {
                disposable.dispose()
            }
        }
    }
}

It emits an array containing the latest N values.

Test:

let valuesGenerator = SignalProducer([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
    .promoteError(Error.self)

valuesGenerator
    .latestValues(count: 3)
    .start(Signal.Observer(value: { value in
        print("valuesGenerator, value: \(value)")
    },
                           failed: { error in
        print("valuesGenerator, error: \(error)")
    },
                           completed: {
        print("valuesGenerator, completed")
    },
                           interrupted: {
        print("valuesGenerator, interrupted")
    }))

Output:

valuesGenerator, value: [1, 2, 3]
valuesGenerator, value: [2, 3, 4]
valuesGenerator, value: [3, 4, 5]
valuesGenerator, value: [4, 5, 6]
valuesGenerator, value: [5, 6, 7]
valuesGenerator, value: [6, 7, 8]
valuesGenerator, value: [7, 8, 9]
valuesGenerator, value: [8, 9, 10]
valuesGenerator, completed

Upvotes: 0

linimin
linimin

Reputation: 6377

let (producerA, observerA) = SignalProducer<Int, NoError>.buffer(5)
let n = 3

producerA.take(n).collect()
        .takeUntilReplacement(producerA.skip(n).map { [$0] })
        .scan([], { $0.suffix(n - 1) + $1 })
        .startWithNext {
                print($0)
}

observerA.sendNext(1) // nothing printed
observerA.sendNext(2) // nothing printed
observerA.sendNext(3) // prints [1, 2, 3]
observerA.sendNext(4) // prints [2, 3, 4]
observerA.sendNext(5) // prints [3, 4, 5]

Upvotes: 0

gkaimakas
gkaimakas

Reputation: 582

I came up with this code

extension SignalProducer {
    /// Creates a new producer that emits an array that contains the latest N values that were emitted 
    /// by the original producer as specified in 'capacity'.
    @warn_unused_result(message="Did you forget to call `start` on the producer?")
    public func latestValues(n:Int) -> SignalProducer<[Value], Error> {
        var array: [Value] = []
        return self.map {
            value in

            array.append(value)

            if array.count >= n {
                array.removeFirst(array.count - n)
            }

            return array
        }
            .filter {
                $0.count == n
        }
    }
}

Upvotes: 0

Related Questions