Reputation: 1351
I'm looking for a smart way of implementing a rate limit in an HTTP client. Let's assume the rate limit on the API is 5 requests per second on any of the resources. Right now the implementation looks similar to this:
final class HTTPClient: HTTPClientProtocol {
func getUser() -> Observable<User> {
return Observable<User>.create { (observer) -> Disposable in
...
}
}
func getProfile() -> Observable<Profile> {
return Observable<Profile>.create { (observer) -> Disposable in
...
}
}
func getMessages() -> Observable<Messages> {
return Observable<Messages>.create { (observer) -> Disposable in
...
}
}
func getFriends() -> Observable<Friends> {
return Observable<Friends>.create { (observer) -> Disposable in
...
}
}
}
Now ideally I would like to use these methods as needed throughout the application without worrying about the rate limit at all.
Back to the example of 5 requests per second: The first five requests can be executed immediately. But all requests after that have to wait. So within a window of 1 second 5 requests can be executed at most. All other requests have to wait.
Is there any smart way of doing this in RxSwift?
Upvotes: 3
Views: 1269
Reputation: 480
Daniel T's use of a custom scheduler is brilliant, and I'm finding that it works well in practice. Here's a version of his code that implements a true sliding-window rate limit:
final class RateLimitedScheduler: ImmediateSchedulerType {
let period: TimeInterval
let queue: DispatchQueue
var dispatchHistory: [DispatchTime]
var dhIndex = 0
init(maxEvents: Int, period: TimeInterval, queue: DispatchQueue = .main) {
self.period = period
self.queue = queue
let periodAgo = DispatchTime.now() - period
dispatchHistory = Array(repeating: periodAgo, count: maxEvents)
}
func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
let cancel = SingleAssignmentDisposable()
queue.asyncAfter(deadline: nextDeadline()) {
guard cancel.isDisposed == false else { return }
cancel.setDisposable(action(state))
}
return cancel
}
private func nextDeadline() -> DispatchTime {
let windowStartTime = dispatchHistory[dhIndex]
let deadline = max(windowStartTime + period, DispatchTime.now())
dispatchHistory[dhIndex] = deadline
dhIndex = (dhIndex >= dispatchHistory.count - 1) ? 0 : (dhIndex + 1)
return deadline
}
}
Note that perfect accuracy requires tracking the dispatch times of the previous N entries, so it's memory-expensive for rates of hundreds or thousands of operations per period. Consider using a "token bucket" for those cases - it's less accurate but requires only constant state (see this thread).
Upvotes: 2
Reputation: 33967
You need a custom Scheduler.
final class DelayScheduler: ImmediateSchedulerType {
init(delay: TimeInterval, queue: DispatchQueue = .main) {
self.queue = queue
dispatchDelay = delay
}
func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
let cancel = SingleAssignmentDisposable()
lastDispatch = max(lastDispatch + dispatchDelay, .now())
queue.asyncAfter(deadline: lastDispatch) {
guard cancel.isDisposed == false else { return }
cancel.setDisposable(action(state))
}
return cancel
}
var lastDispatch: DispatchTime = .now()
let queue: DispatchQueue
let dispatchDelay: TimeInterval
}
Then you implement your Service by having all its Observables subscribe on this scheduler:
final class HTTPClient: HTTPClientProtocol {
func getUser() -> Observable<User> {
return Observable<User>.create { (observer) -> Disposable in
...
}.subscribeOn(scheduler)
}
func getProfile() -> Observable<Profile> {
return Observable<Profile>.create { (observer) -> Disposable in
...
}.subscribeOn(scheduler)
}
func getMessages() -> Observable<Messages> {
return Observable<Messages>.create { (observer) -> Disposable in
...
}.subscribeOn(scheduler)
}
func getFriends() -> Observable<Friends> {
return Observable<Friends>.create { (observer) -> Disposable in
...
}.subscribeOn(scheduler)
}
let scheduler = DelayScheduler(delay: 0.5)
}
Upvotes: 2