Reputation: 3671
I have a pretty straight forward code where I load a list if id's from a file and then iterate through each id in the list and call an api where i pass the id value and dump the api response content into a file.
I would like to speed this process up by doing parallel api calls, however the api server only allows 5 calls max per second. another key consideration is the api pull is slow, on average each call takes 10 seconds to finish.
I would like to be able to have multiple parallel process which have some way of ensuring that no more than 5 calls max occur in a single second.
This is the current code:
import pandas as pd
import numpy as np
from joblib import Parallel, delayed
ids = pd.read_csv('data.csv')
ids = ids['Id'].values.tolist()
def dump_data(df,idx):
filename = base_dir+'\\'+str(idx)+'.csv'
data.to_csv(filename, header= True, index=False) #write data to file
def get_api(idx):
data = call_some_api(idx) #api returns data as pandas dataframe, take about 10 secs
dump_data(df,idx)
Parallel(n_jobs=10, verbose = 50)(delayed(get_api)(idx) for idx in ids)
I'm currently using joblib, but if there is a better library for this solution it can be used instead.
How can I ensure there will not be more than 5 requests going out at any given second? (while at he same time doing all the requests as fast as possible)
Also I'm using Python 3.9 on Windows
Upvotes: 6
Views: 6258
Reputation: 44128
Update 2
After rethinking this a bit more, it makes more sense to use either a standard multithreading pool or multiprocessing pool depending on your needs and to then pass to the worker function (either indirectly as a global or explicitly as an argument) a CallingThrottle
instance whose throttle
method can be called directly by the worker function at the precise point in processing that throttling needs to take place (right before making the request to the website). Passing the throttle instance directly as an argument to your worker function should allow you to use this with joblib
(but I would think in your case all you need is a multithreading pool).
For example:
from multiprocessing.pool import ThreadPool, Pool
from multiprocessing.managers import BaseManager
from threading import Lock
import time
class CallingThrottle:
def __init__(self, nb_call_times_limit, expired_time):
self.nb_call_times_limit = nb_call_times_limit
self.expired_time = expired_time
self.called_timestamps = list()
self.lock = Lock()
def throttle(self):
with self.lock:
while len(self.called_timestamps) == self.nb_call_times_limit:
now = time.time()
self.called_timestamps = list(filter(
lambda x: now - x < self.expired_time,
self.called_timestamps
))
if len(self.called_timestamps) == self.nb_call_times_limit:
time_to_sleep = self.called_timestamps[0] + self.expired_time - now
time.sleep(time_to_sleep)
self.called_timestamps.append(time.time())
# A "managed" CallingThrottle is required for use with multiprocessing:
class CallingThrottleManager(BaseManager):
pass
CallingThrottleManager.register('CallingThrottle', CallingThrottle)
def init_pool(throttle):
global calling_throttle
calling_throttle = throttle
def worker(x):
"""
Emulate a task that takes 10 seconds to execute.
Cannot run more than 5 of these per second.
"""
from datetime import datetime
calling_throttle.throttle()
print(datetime.now(), 'x =', x)
time.sleep(10)
return x, x * x
def main():
# Multithreading example:
calling_throttle = CallingThrottle(5, 1) # 5 calls every 1 second
pool = ThreadPool(20)
init_pool(calling_throttle)
start = time.time()
results = pool.map(worker, range(20))
print('Total elapsed time:', time.time() - start)
pool.close()
pool.join()
print('\n', '-' * 30, '\n', sep='')
# Multiprocessing example:
with CallingThrottleManager() as manager:
calling_throttle = manager.CallingThrottle(5, 1) # 5 calls every 1 second
pool = Pool(20, initializer=init_pool, initargs=(calling_throttle,))
start = time.time()
results = pool.map(worker, range(20))
print('Total elapsed time:', time.time() - start)
pool.close()
pool.join()
if __name__ == '__main__':
main()
To use a throttle with joblib
:
import pandas as pd
import numpy as np
from joblib import Parallel, delayed
from multiprocessing.managers import BaseManager
from threading import Lock
import time
class CallingThrottle:
def __init__(self, nb_call_times_limit, expired_time):
self.nb_call_times_limit = nb_call_times_limit
self.expired_time = expired_time
self.called_timestamps = list()
self.lock = Lock()
def throttle(self):
with self.lock:
while len(self.called_timestamps) == self.nb_call_times_limit:
now = time.time()
self.called_timestamps = list(filter(
lambda x: now - x < self.expired_time,
self.called_timestamps
))
if len(self.called_timestamps) == self.nb_call_times_limit:
time_to_sleep = self.called_timestamps[0] + self.expired_time - now
time.sleep(time_to_sleep)
self.called_timestamps.append(time.time())
# A "managed" CallingThrottle is required for use with multiprocessing:
class CallingThrottleManager(BaseManager):
pass
def dump_data(df, idx):
filename = base_dir+'\\'+str(idx)+'.csv'
data.to_csv(filename, header= True, index=False) #write data to file
def get_api(calling_throttle, idx):
calling_throttle.throttle()
data = call_some_api(idx) #api returns data as pandas dataframe, take about 10 secs
dump_data(df, idx)
def main():
ids = pd.read_csv('data.csv')
ids = ids['Id'].values.tolist()
CallingThrottleManager.register('CallingThrottle', CallingThrottle)
with CallingThrottleManager() as manager:
calling_throttle = manager.CallingThrottle()
Parallel(n_jobs=10, verbose = 50)(delayed(get_api)(calling_throttle, idx) for idx in ids)
if __name__ == '__main__':
main()
Update 1
I originally implemented the rate-limiting algorithm that was referenced in the comment made by @balmy and it was noticed that there are times where the rate can be exceeded. This phenomenon was commented upon by @mindvirus where the OP was trying for 5 messages in an 8 second period:
This is good, but can exceed the rate. Let's say at time 0 you forward 5 messages, then at time N * (8/5) for N = 1, 2, ... you can send another message, resulting in more than 5 messages in an 8 second period.
So I am now using a new rate-limiting algorithm.
I have created two classes, RateLimitedProcessPool
and RateLimitedThreadPool
for mulitprocessing and multithreading respectively based on the algorithm presented in What's a good rate limiting algorithm?. These classes are like the standard mulitprocessing.pool.Pool
and multiprocessing.pool.ThreadPool
classes except the __init__
methods take two extra keyword arguments rate and per that together specify the maximum rate per second that the apply_async
method can be called. For example, values rate=7 and per=3 implies that successive calls to apply_async
will throttle so as to only allow a maximum rate of 7 calls every 3 seconds.
The following code demonstrates this with a simple worker function that emulates the OP's situation where the worker function takes 10 seconds to execute and must be limited to a maximum rate of 5 calls per second. We need to invoke this function 20 times and so the best performance we can achieve is a total run time of approximately 13 seconds.
import multiprocessing.pool
import multiprocessing
import threading
from functools import wraps
import time
class CallingThrottle:
def __init__(self, nb_call_times_limit, expired_time):
self.nb_call_times_limit = nb_call_times_limit
self.expired_time = expired_time
self.called_timestamps = list()
def __enter__(self):
while len(self.called_timestamps) == self.nb_call_times_limit:
now = time.time()
self.called_timestamps = list(filter(
lambda x: now - x < self.expired_time,
self.called_timestamps
))
if len(self.called_timestamps) == self.nb_call_times_limit:
time_to_sleep = self.called_timestamps[0] + self.expired_time - now
time.sleep(time_to_sleep)
self.called_timestamps.append(time.time())
def __exit__(self, *exc):
pass
class RateLimitedPool:
def __init__(self, rate, per):
self.calling_throttle = CallingThrottle(rate, per)
self.first_time = True
def apply_async(self, *args, **kwargs):
# There could be a lag between the first call to apply_async
# and the first task actually starting, so set the first time
# after the call to apply_async:
if self.first_time:
self.first_time = False
async_result = super().apply_async(*args, **kwargs)
with self.calling_throttle:
pass
return async_result
else:
with self.calling_throttle:
return super().apply_async(*args, **kwargs)
class RateLimitedProcessPool(RateLimitedPool, multiprocessing.pool.Pool):
def __init__(self, *args, rate=5, per=1, **kwargs):
multiprocessing.pool.Pool.__init__(self, *args, **kwargs)
RateLimitedPool.__init__(self, rate, per)
class RateLimitedProcessPool(RateLimitedPool, multiprocessing.pool.Pool):
def __init__(self, *args, rate=5, per=1, **kwargs):
multiprocessing.pool.Pool.__init__(self, *args, **kwargs)
RateLimitedPool.__init__(self, rate, per)
class RateLimitedThreadPool(RateLimitedPool, multiprocessing.pool.ThreadPool):
def __init__(self, *args, rate=5, per=1, **kwargs):
multiprocessing.pool.Pool.__init__(self, *args, **kwargs)
RateLimitedPool.__init__(self, rate, per)
def threadpool(pool):
def decorate(f):
@wraps(f)
def wrap(*args, **kwargs):
return pool.apply_async(f, args, kwargs)
return wrap
return decorate
def processpool(pool):
def decorate(f):
@wraps(f)
def wrap(*args, **kwargs):
return pool.apply_async(f, args, kwargs)
return wrap
return decorate
########################################
def worker(x):
"""
Emulate a task that takes 10 seconds to execute.
Cannot run more than 5 of these per second.
"""
from datetime import datetime
print(datetime.now(), 'x =', x)
time.sleep(10)
return x, x * x
def main():
args = range(20)
pool = RateLimitedThreadPool(20, rate=5, per=1) # 5 per second
start = time.time()
for x in args:
pool.apply_async(worker, args=(x,))
# Wait for all tasks to complete
pool.close()
pool.join()
print('Total elapsed time:', time.time() - start)
if __name__ == '__main__':
main()
Prints:
2021-10-03 07:19:48.002628 x = 0
2021-10-03 07:19:48.002628 x = 1
2021-10-03 07:19:48.002628 x = 3
2021-10-03 07:19:48.002628 x = 4
2021-10-03 07:19:48.002628 x = 2
2021-10-03 07:19:49.005625 x = 5
2021-10-03 07:19:49.005625 x = 6
2021-10-03 07:19:49.005625 x = 8
2021-10-03 07:19:49.005625 x = 7
2021-10-03 07:19:49.005625 x = 9
2021-10-03 07:19:50.008775 x = 10
2021-10-03 07:19:50.008775 x = 11
2021-10-03 07:19:50.008775 x = 13
2021-10-03 07:19:50.008775 x = 12
2021-10-03 07:19:50.008775 x = 14
2021-10-03 07:19:51.012774 x = 15
2021-10-03 07:19:51.012774 x = 16
2021-10-03 07:19:51.012774 x = 17
2021-10-03 07:19:51.012774 x = 18
2021-10-03 07:19:51.012774 x = 19
Total elapsed time: 13.015560150146484
CPU Intensive Example
In the following example I am using a RateLimitedProcessPool
since my worker function is 100% CPU taking approximately 10 seconds to execute on my desktop. I only have 8 logical cores (4 physical cores), so my pool size is 8 and for this demo I am submitting 8 tasks with a rate of 3 tasks per second. The second 3 tasks will start approximately 1 second after the first 3 and the next 2 tasks will start 1 second after that. Because the number of physical cores becomes a limiting factor, the total running time is a little over 21 seconds.
import multiprocessing.pool
import multiprocessing
import threading
from functools import wraps
import time
class RateLimitedPool:
# There is an a lag between the first call to apply_async and the first task actually starting:
LAG_TIME = .2 # seconds - needs to be fine-tuned:
def __init__(self, rate, per):
assert isinstance(rate, int) and rate > 0
assert isinstance(per, (int, float)) and per > 0
self.rate = rate
self.per = per
self.count = 0
self.start_time = None
self.first_time = True
def _check_allowed(self):
current_time = time.time()
if self.start_time is None:
self.start_time = current_time
self.count = 1
return True
elapsed_time = current_time - self.start_time
if self.first_time:
elapsed_time -= self.LAG_TIME
if elapsed_time >= self.per:
self.start_time = current_time
self.count = 1
self.first_time = False
return True
if self.count < self.rate:
self.count += 1
return True
return False
def apply_async(self, *args, **kwargs):
while not self._check_allowed():
time.sleep(.1) # This can be fine-tuned
return super().apply_async(*args, **kwargs)
class RateLimitedProcessPool(RateLimitedPool, multiprocessing.pool.Pool):
def __init__(self, *args, rate=5, per=1, **kwargs):
multiprocessing.pool.Pool.__init__(self, *args, **kwargs)
RateLimitedPool.__init__(self, rate, per)
class RateLimitedThreadPool(RateLimitedPool, multiprocessing.pool.ThreadPool):
def __init__(self, *args, rate=5, per=1, **kwargs):
multiprocessing.pool.Pool.__init__(self, *args, **kwargs)
RateLimitedPool.__init__(self, rate, per)
def threadpool(pool):
def decorate(f):
@wraps(f)
def wrap(*args, **kwargs):
return pool.apply_async(f, args, kwargs)
return wrap
return decorate
def processpool(pool):
def decorate(f):
@wraps(f)
def wrap(*args, **kwargs):
return pool.apply_async(f, args, kwargs)
return wrap
return decorate
########################################
ONE_SECOND_ITERATIONS = 20_000_000
def one_second():
sum = 0
for _ in range(ONE_SECOND_ITERATIONS):
sum += 1
return sum
def worker(x):
"""
Emulate a task that takes 10 seconds to execute.
Cannot run more than 3 of these per second.
"""
from datetime import datetime
print(datetime.now(), 'x = ', x)
for _ in range(10):
one_second()
return x, x * x
def main():
args = range(8)
pool = RateLimitedProcessPool(8, rate=3, per=1) # 3 per second
start = time.time()
for x in args:
pool.apply_async(worker, args=(x,))
# Wait for all tasks to complete
pool.close()
pool.join()
print('Total elapsed time:', time.time() - start)
if __name__ == '__main__':
main()
Prints:
2021-10-03 09:51:32.857166 x = 0
2021-10-03 09:51:32.859168 x = 1
2021-10-03 09:51:32.864166 x = 2
2021-10-03 09:51:33.899890 x = 5
2021-10-03 09:51:33.899890 x = 3
2021-10-03 09:51:33.907888 x = 4
2021-10-03 09:51:34.924889 x = 6
2021-10-03 09:51:34.925888 x = 7
Total elapsed time: 21.22123622894287
Upvotes: 4
Reputation: 44128
The only reasonably accurate way to do this is to perform the rate-limiting algorithm at the point where the worker function is about to call the API that requires the rate-limiting. When you attempt to do this at task submission the problem becomes that you have no control over how the tasks are ultimately scheduled and you can no longer guarantee that the number of API calls per second will be achieved. But to move the algorithm out to each process means that you must now use values in shared memory with locking to ensure everything works correctly.
This is achieved by the RateLimiter
class that creates two shared memory values, one to hold a count of the number of API calls that have been made in the current interval and and another to hold the time that the current interval expiries. Each process in the pool is initialized with a global variable that is an instance of this RateInstance
class that contain references to these shared variables:
from multiprocessing import Pool, Lock, Value
import time
class RateLimiter:
FUDGE_FACTOR = .0015 # due to imprecision of timer: 15ms.
def __init__(self, rate, per):
assert isinstance(rate, int) and rate > 0
assert isinstance(per, (int, float)) and per > 0
self._rate = rate
self._per = per
self._lock = Lock()
self._count = Value('i', False) # 0
self._expiration = Value('d', False) # 0.0
def __call__(self, api_func, *args, **kwargs):
with self._lock:
now = time.time()
if now >= self._expiration.value:
self._expiration.value = now + self._per + self.FUDGE_FACTOR
self._count.value = 1
elif self._count.value < self._rate:
self._count.value += 1
else:
time.sleep(self._expiration.value - now)
self._expiration.value = time.time() + self._per + self.FUDGE_FACTOR
self._count.value = 1
return api_func(*args, **kwargs)
def init_pool(the_rate_limiter):
global rate_limiter
rate_limiter = the_rate_limiter
def api_func(x):
from datetime import datetime
print(datetime.now(), 'x = ', x)
time.sleep(10 - x/3)
return x * x
def worker(x):
return rate_limiter(api_func, x)
def main():
# 5 api calls per second
rate_limiter = RateLimiter(5, 1.0)
pool = Pool(20, initializer=init_pool, initargs=(rate_limiter,))
results = pool.map(worker, range(20))
pool.close()
pool.join()
print(results)
if __name__ == "__main__":
main()
Prints:
2021-10-04 07:12:32.148725 x = 0
2021-10-04 07:12:32.171724 x = 3
2021-10-04 07:12:32.172724 x = 2
2021-10-04 07:12:32.173724 x = 1
2021-10-04 07:12:32.175724 x = 4
2021-10-04 07:12:33.151015 x = 7
2021-10-04 07:12:33.151015 x = 8
2021-10-04 07:12:33.151015 x = 5
2021-10-04 07:12:33.151015 x = 9
2021-10-04 07:12:33.152014 x = 6
2021-10-04 07:12:34.154015 x = 14
2021-10-04 07:12:34.154015 x = 13
2021-10-04 07:12:34.154015 x = 10
2021-10-04 07:12:34.155014 x = 12
2021-10-04 07:12:34.156015 x = 11
2021-10-04 07:12:35.155018 x = 16
2021-10-04 07:12:35.156014 x = 19
2021-10-04 07:12:35.156014 x = 17
2021-10-04 07:12:35.156014 x = 15
2021-10-04 07:12:35.157015 x = 18
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361]
Upvotes: -1
Reputation: 11075
You could get real fancy with something like this, but I doubt you'll get significantly better performance than something dumb and simple like just submitting each task to a processing pool with a 1/5th of a second delay. It would be worth testing if your api call releases the GIL, because using 1 thread per active call (in flight) is much less resource intensive than 1 process per active call (though it still may be fine.. hardware is cheap right?).
One potential issue I can see with this is if a particular batch of calls takes a long time, and the input queue of the pool starts filling up, once the slow tasks complete, you may end up submitting too many at once. An easy solution that may solve this would be to just provision the processing pool with more workers than you really need. If the average call lasts 10 sec, and you submit 5/sec you'd need 50 workers. You could fairly easily just give the pool 100 workers, and as long as the api call isn't too cpu intensive this would only be ram inefficient.
The other potential issue you may run into is that the endpoint may only allow a certain number of simultaneous connections, which may at times be less than the number of calls in flight you want to have. The solution to this would be to limit the number of pool workers to the maximum number of connections, but this conflicts with the solution to the earlier problem.
Here's how I would solve both: give a pool workers an input queue of limited size so the main thread can't pile up a bunch of jobs in the input queue. Here's how I would do that:
import pandas as pd
from multiprocessing import Process, Queue
from time import sleep
def dump_data(df,idx):
filename = base_dir+'\\'+str(idx)+'.csv'
data.to_csv(filename, header= True, index=False) #write data to file
def get_api(idx):
data = call_some_api(idx) #api returns data as pandas dataframe, take about 10 secs
dump_data(df,idx)
class STOPFLAG: pass
def worker(in_q):
while True:
task = in_q.get()
if isinstance(task, STOPFLAG):
return
get_api(task)
def main():
# int: n tasks per float: t seconds
n = 5 #tasks per interval
t = 1.0 #interval in seconds
max_in_flight = 20 #limited to 20 concurrent connections
ids = pd.read_csv('data.csv')
ids = ids['Id'].values.tolist()
work_q = Queue(maxsize=1) #maxsize will cause q.put() to block until it's got space to put something
pool = [Process(target=worker, args=(work_q,)) for _ in range(max_in_flight)]
for p in pool: p.start()
for idx in ids:
work_q.put(idx)
sleep(t/n) #dumb but effective rate limit
for p in pool: work_q.put(STOPFLAG())
for p in pool: p.join() #I p in pools :P
if __name__ == "__main__":
main()
If your api endpoint returns rate limit headers, you could fairly easily modify the code to continually update a few shared values from the workers so the main thread could decide more intelligently when (and how many) to launch tasks:
from multiprocessing import Value
def worker(in_q, Value: limit, Value: remaining, Value: reset):
while True:
task = in_q.get()
if isinstance(task, STOPFLAG):
return
response = get_api(task)
if "X-Rate-Limit-Limit" in response.headers:
with limit.get_lock(): #beyond 3.5 you can just call "with limit:" to acquire the lock for synchronization
limit.value = response.headers["X-Rate-Limit-Limit"]
# same for "X-Rate-Limit-Remaining" and "X-Rate-Limit-Reset"
#then in main:
#...
for idx in ids:
#get most recent number of remaining connections allowed:
with remaining.get_lock():
n = remaining.value
if n > 0:
work_q.put(idx)
else:
with reset.get_lock():
sleeptime = reset.value - time.time()
sleep(sleeptime) #maybe add an extra bit of sleep time to account for possible clock mismatch
Upvotes: -1
Reputation: 610
it's possible to use the concurrent.futures library and a loop that will run 5 threads at a time, i have limited the number of worker threads to 50 but this can probably be higher as the threads are not doing any cpu intensive tasks. with the last sleep(1) statement you are guaranteed to get around or just under 5 calls per second (because of the loop processing time)
import pandas as pd
import numpy as np
import concurrent.futures, time
ids = pd.read_csv('data.csv')
ids = ids['Id'].values.tolist()
def dump_data(df,idx):
filename = base_dir+'\\'+str(idx)+'.csv'
data.to_csv(filename, header= True, index=False) #write data to file
def get_api(idx):
data = call_some_api(idx) #api returns data as pandas dataframe, take about 10 secs
dump_data(df,idx)
N = 5 # number of requests per second
L = 10 # latency per request
with concurrent.futures.ThreadPoolExecutor(max_workers=N*L) as ex:
# reshape IDs into a list of lists of N items each and loop over them
for id_list in ((ids[i] for i in range(j*N,min(j*N+N,len(ids))) ) for j in range(round(len(ids)/N+0.5)) )
# wait if there are items on the work queue
while ex._work_queue.qsize()>0: time.sleep(1)
ex.map(get_api,id_list)
time.sleep(1)
Upvotes: 0