chris838
chris838

Reputation: 5178

Create a moving average (and other FIR filters) using ReactiveCocoa

I'm still getting started with ReactiveCocoa and functional reactive programming concepts, so maybe this is a dumb question.

ReactiveCocoa seem naturally designed to react to streams of live data, touch events or accelerometer sensor input etc.

Is it possible to apply finite impulse response filters in ReactiveCocoa in an easy, reactive fashion? Or if not, what would be the least-ugly hacky way of doing this? How would one go about implementing something like a simple moving average?

Ideally looking for an Swift 2 + RA4 solution but also interested in if this is possible at all in Objective C and RA2/RA3.

Upvotes: 3

Views: 988

Answers (2)

Michał Ciuba
Michał Ciuba

Reputation: 7944

Probably the scan signal operator is what you're looking for. Inspired by Andy Jacobs' answer, I came up with something like this (a simple moving average implementation):

  let (signal, observer) = Signal<Int,NoError>.pipe()

  let maxSamples = 3

  let movingAverage = signal.scan( [Int]() ) { (previousSamples, nextValue)  in 
    let samples : [Int] =  previousSamples.count < maxSamples ? previousSamples : Array(previousSamples.dropFirst())
    return samples + [nextValue]
  }
  .filter { $0.count >= maxSamples }
  .map { $0.average }

  movingAverage.observeNext { (next) -> () in
    print("Next: \(next)")
  }

  observer.sendNext(1)
  observer.sendNext(2)
  observer.sendNext(3)
  observer.sendNext(4)
  observer.sendNext(42)

Note: I had to move average method into a protocol extension, otherwise the compiler would complain that the expression was too complex. I used a nice solution from this answer:

extension Array where Element: IntegerType {
    var total: Element {
        guard !isEmpty else { return 0 }
        return reduce(0){$0 + $1}
    }
    var average: Double {
        guard let total = total as? Int where !isEmpty else { return 0 }
        return Double(total)/Double(count)
    }
}

Upvotes: 1

Andy Jacobs
Andy Jacobs

Reputation: 15245

What you actually need is a some sort of period buffer, which will keep a period of values buffered and only start sending out when the buffer has reached capacity (the code below is heavenly inspired on takeLast operator)

extension SignalType {
    func periodBuffer(period:Int) -> Signal<[Value], Error> {
        return Signal { observer in
            var buffer: [Value] = []
            buffer.reserveCapacity(period)

            return self.observe { event in
                switch event {
                case let .Next(value):
                    // To avoid exceeding the reserved capacity of the buffer, we remove then add.
                    // Remove elements until we have room to add one more.
                    while (buffer.count + 1) > period {
                        buffer.removeAtIndex(0)
                    }

                    buffer.append(value)

                    if buffer.count == period {
                        observer.sendNext(buffer)
                    }
                case let .Failed(error):
                    observer.sendFailed(error)
                case .Completed:
                    observer.sendCompleted()
                case .Interrupted:
                    observer.sendInterrupted()
                }
            }
        }
    }
}

based on that you can map it to any algorithm you want

let pipe = Signal<Int,NoError>.pipe()

pipe.0
    .periodBuffer(3)
    .map { Double($0.reduce(0, combine: +))/Double($0.count) } // simple moving average
    .observeNext { print($0) }

pipe.1.sendNext(10) // does nothing
pipe.1.sendNext(11) // does nothing
pipe.1.sendNext(15) // prints 12
pipe.1.sendNext(7) // prints 11
pipe.1.sendNext(9) // prints 10.3333
pipe.1.sendNext(6) // prints 7.3333

Upvotes: 4

Related Questions