plu
plu

Reputation: 1351

Rate limit with RxSwift

I'm looking for a smart way of implementing a rate limit in an HTTP client. Let's assume the rate limit on the API is 5 requests per second on any of the resources. Right now the implementation looks similar to this:

final class HTTPClient: HTTPClientProtocol {

    func getUser() -> Observable<User> {
        return Observable<User>.create { (observer) -> Disposable in
            ...
        }
    }

    func getProfile() -> Observable<Profile> {
        return Observable<Profile>.create { (observer) -> Disposable in
            ...
        }
    }

    func getMessages() -> Observable<Messages> {
        return Observable<Messages>.create { (observer) -> Disposable in
            ...
        }
    }

    func getFriends() -> Observable<Friends> {
        return Observable<Friends>.create { (observer) -> Disposable in
            ...
        }
    }

}

Now ideally I would like to use these methods as needed throughout the application without worrying about the rate limit at all.

Back to the example of 5 requests per second: The first five requests can be executed immediately. But all requests after that have to wait. So within a window of 1 second 5 requests can be executed at most. All other requests have to wait.

Is there any smart way of doing this in RxSwift?

Upvotes: 3

Views: 1269

Answers (2)

GSnyder
GSnyder

Reputation: 480

Daniel T's use of a custom scheduler is brilliant, and I'm finding that it works well in practice. Here's a version of his code that implements a true sliding-window rate limit:

final class RateLimitedScheduler: ImmediateSchedulerType {

    let period: TimeInterval
    let queue: DispatchQueue

    var dispatchHistory: [DispatchTime]
    var dhIndex = 0

    init(maxEvents: Int, period: TimeInterval, queue: DispatchQueue = .main) {
        self.period = period
        self.queue = queue
        let periodAgo = DispatchTime.now() - period
        dispatchHistory = Array(repeating: periodAgo, count: maxEvents)
    }

    func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
        let cancel = SingleAssignmentDisposable()
        queue.asyncAfter(deadline: nextDeadline()) {
            guard cancel.isDisposed == false else { return }
            cancel.setDisposable(action(state))
        }
        return cancel
    }

    private func nextDeadline() -> DispatchTime {
        let windowStartTime = dispatchHistory[dhIndex]
        let deadline = max(windowStartTime + period, DispatchTime.now())
        dispatchHistory[dhIndex] = deadline
        dhIndex = (dhIndex >= dispatchHistory.count - 1) ? 0 : (dhIndex + 1)
        return deadline
    }

}

Note that perfect accuracy requires tracking the dispatch times of the previous N entries, so it's memory-expensive for rates of hundreds or thousands of operations per period. Consider using a "token bucket" for those cases - it's less accurate but requires only constant state (see this thread).

Upvotes: 2

Daniel T.
Daniel T.

Reputation: 33967

You need a custom Scheduler.

final class DelayScheduler: ImmediateSchedulerType {

    init(delay: TimeInterval, queue: DispatchQueue = .main) {
        self.queue = queue
        dispatchDelay = delay
    }

    func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
        let cancel = SingleAssignmentDisposable()
        lastDispatch = max(lastDispatch + dispatchDelay, .now())
        queue.asyncAfter(deadline: lastDispatch) {
            guard cancel.isDisposed == false else { return }
            cancel.setDisposable(action(state))
        }
        return cancel
    }

    var lastDispatch: DispatchTime = .now()
    let queue: DispatchQueue
    let dispatchDelay: TimeInterval
}

Then you implement your Service by having all its Observables subscribe on this scheduler:

final class HTTPClient: HTTPClientProtocol {

    func getUser() -> Observable<User> {
        return Observable<User>.create { (observer) -> Disposable in
            ...
        }.subscribeOn(scheduler)
    }

    func getProfile() -> Observable<Profile> {
        return Observable<Profile>.create { (observer) -> Disposable in
            ...
        }.subscribeOn(scheduler)
    }

    func getMessages() -> Observable<Messages> {
        return Observable<Messages>.create { (observer) -> Disposable in
            ...
        }.subscribeOn(scheduler)
    }

    func getFriends() -> Observable<Friends> {
        return Observable<Friends>.create { (observer) -> Disposable in
            ...
        }.subscribeOn(scheduler)
    }

    let scheduler = DelayScheduler(delay: 0.5)
}

Upvotes: 2

Related Questions