Alex Wolf
Alex Wolf

Reputation: 20128

Signal: Collect values over time interval

This might be a trivial question but I'm unable to find a solution to this seemingly easy task. As I'm new to ReactiveSwift and reactive programming I might simply miss something obvious.

Basically what I want to do is something like this:

signal.collect(timeInterval: .seconds(5))

I want to collect all values over a specific period of time from a signal. The resulting signal would produce an event every x seconds which would contain an array of the collected events from the first signal.

What is the best approach to do this in ReactiveSwift?

Upvotes: 4

Views: 1389

Answers (2)

Petro Korienev
Petro Korienev

Reputation: 4027

There's no built-in operator in ReactiveSwift for this task. Instead, you can use following approach, writing an extension:

import Foundation
import ReactiveSwift
import Result
public extension Signal {
    public func completeAfter(after: TimeInterval, onScheduler : DateSchedulerProtocol = QueueScheduler() ) -> Signal {
        let pipe : (Signal<(), NoError>, ReactiveSwift.Observer<(), NoError>) = Signal<(), NoError>.pipe()
        onScheduler.schedule(after: Date(timeIntervalSinceNow: after)) {
            pipe.1.sendCompleted()
        }
        return Signal { observer in
            return self.observe { event in
                switch event {
                case let .value(value):
                    observer.send(value: value)
                case .completed:
                    observer.sendCompleted()
                case let .failed(error):
                    observer.send(error: error)
                case .interrupted:
                    observer.sendInterrupted()
                }
            }
        }.take(until: pipe.0)
    }
    
    public func collectUntil(until: TimeInterval) -> Signal<[Value], Error> {
        return self.completeAfter(after: until).collect()
    }
}

And then use signal.collectUntil(5) method.

Another way is to use timer function from ReactiveSwift. Example (add to same extension, as above):

public func collectUntil2(until: TimeInterval) -> Signal<[Value], Error> {
    var signal: Signal<(), NoError>? = nil
    timer(interval: until, on: QueueScheduler()).startWithSignal { innerSignal, _ in
        signal = innerSignal.map { _ in () }.take(first: 1)
    }
    return self.take(until: signal!).collect()
}

I, however, don't like this approach, because it's faking nature of SignalProducer type extracting inner signal.

Signal type itself has also timeout function, however it will be difficult to use it since it's error-producing. Example of how to use it (still, add to the same extension):

public func completeOnError() -> Signal<Value, Error> {
    return Signal { observer in
        return self.observe { event in
            switch(event) {
            case .value(let v): observer.send(value: v)
            case .failed(_): observer.sendCompleted()
            case .interrupted: observer.sendInterrupted()
            case .completed: observer.sendCompleted()
            }
        }
    }
}

public func collectUntil3(until: TimeInterval) -> Signal<[Value], Error> {
    return self
        .timeout(after: until,
                 raising: NSError() as! Error,
                 on: QueueScheduler())
        .completeOnError()
        .collect()
}

PS: by choosing any of 3 options, mind passing correct scheduler or parametrizing your solution with correct scheduler.

Upvotes: 4

Alex Wolf
Alex Wolf

Reputation: 20128

Based on the answer by Petro Korienev (which sadly wasn't quite what I was looking for), I've created an extension which solves my problem. The extension follows the structure of the ReactiveSwift collect functions, to stay as close as possible to the intents of ReactiveSwift.

It will collect all sent values over a given timeInterval and then send them as array. On a terminating event it will also send the remaining values, if there are any.

extension Signal {
    func collect(timeInterval: DispatchTimeInterval,
                 on scheduler: QueueScheduler = QueueScheduler()) -> Signal<[Value], Error> {
        return Signal<[Value], Error> { observer in
            var values: [Value] = []
            let sendAction: () -> Void = {
                observer.send(value: values)

                values.removeAll(keepingCapacity: true)
            }
            let disposable = CompositeDisposable()
            let scheduleDisposable = scheduler.schedule(
                    after: Date(timeInterval: timeInterval.timeInterval, since: scheduler.currentDate),
                    interval: timeInterval,
                    action: sendAction
            )

            disposable += scheduleDisposable
            disposable += self.observe { (event: Event<Value, Error>) in
                if event.isTerminating {
                    if !values.isEmpty {
                        sendAction()
                    }

                    scheduleDisposable?.dispose()
                }

                switch event {
                case let .value(value):
                    values.append(value)
                case .completed:
                    observer.sendCompleted()
                case let .failed(error):
                    observer.send(error: error)
                case .interrupted:
                    observer.sendInterrupted()
                }
            }

            return disposable
        }
    }
}

extension SignalProducer {
    func collect(timeInterval: DispatchTimeInterval,
                 on scheduler: QueueScheduler = QueueScheduler()) -> SignalProducer<[Value], Error> {
        return lift { (signal: ProducedSignal) in
            signal.collect(timeInterval: timeInterval, on: scheduler)
        }
    }
}

extension DispatchTimeInterval {
    var timeInterval: TimeInterval {
        switch self {
        case let .seconds(s):
            return TimeInterval(s)
        case let .milliseconds(ms):
            return TimeInterval(TimeInterval(ms) / 1000.0)
        case let .microseconds(us):
            return TimeInterval(UInt64(us) * NSEC_PER_USEC) / TimeInterval(NSEC_PER_SEC)
        case let .nanoseconds(ns):
            return TimeInterval(ns) / TimeInterval(NSEC_PER_SEC)
        }
    }
}

Upvotes: 1

Related Questions