Reputation: 107
I am developing a bot with module pyTelegramBotAPI, which is working via webhooks with installed Flask+Gunicorn as server for webhooks. Gunicorn is working with 5 workers for better speed, structure of my project looks like that:
app.py
bot.py
In bot.py i have a function for processing updates:
def process_the_update(update):
logger.info(update)
update = types.Update.de_json(update)
bot.process_new_updates([update])
In app.py i imported this function, so, whenever an update comes, app.py will call this function, and bot will handle the update. In my bot user can call a command, which will use an external api to get some information. The problem is, that this external api has limit for 3 requests per second. I need to configure a bot with such rate limit. First i thought of doing it with Queue with code like this:
lock_queue = Queue(1)
requests_queue = Queue(3)
def api_request(argument):
if lock_queue.empty():
try:
requests_queue.put_nowait(time.time())
except queue.Full:
lock_queue.put(1)
first_request_time = requests_queue.get()
logger.info('First request time: ' + str(first_request_time))
current_time = time.time()
passed_time = current_time - first_request_time
if passed_time >= 1:
requests_queue.put_nowait(time.time())
lock_queue.get()
else:
logger.info(passed_time)
time.sleep(1 - passed_time)
requests_queue.put_nowait(time.time())
lock_queue.get()
else:
lock_queue.put(1)
first_request_time = vk_requests_queue.get()
logger.info('First request time: ' + str(first_request_time))
current_time = time.time()
passed_time = current_time - first_request_time
if passed_time >= 1:
requests_queue.put_nowait(time.time())
lock_queue.get()
else:
logger.info(passed_time)
time.sleep(1 - passed_time)
requests_queue.put_nowait(time.time())
lock_queue.get()
result = make_api_request(argument) # requests are made too by external module.
return result
The logic was that, as i thought, because module pyTelegramBotAPI uses threads for faster updates handling, all threads would check requests_queue, which will have time of 3 last api_requests, and so the time of the first of 3 requests made will be compared to the current time (to check, if a second passed). And, because i needed to be sure, that only one thread would do this kind of comparison simultaneously, i made lock_queue. But, the problem is that, firstly, gunicorn uses 5 workers, so there will be always possibility, that all messages from users will be handled in different processes, and these processes will have their own queues. And, secondly, even if i set number of workers to default (1 worker), i still get 429 error, so i think, that my code won't work as i wanted at all.
I thought of making rate limit with redis, so every time in every thread and process bot will check the time of last 3 requests, but still i am not sure, that this is the right way, and i am not sure, how to write this.
Would be glad, if someone suggests any ideas or even examples of code (external api does not provide any x-rate-limit header)
Upvotes: 1
Views: 3025
Reputation: 107
Wrote this function, using redis to count requests (based on this https://www.binpress.com/tutorial/introduction-to-rate-limiting-with-redis/155 tutorial)
import redis
r_db = redis.Redis(port=port, db=db)
def limit_request(request_to_make, limit=3, per=1, request_name='test', **kwargs):
over_limit_lua_ = '''
local key_name = KEYS[1]
local limit = tonumber(ARGV[1])
local duration = ARGV[2]
local key = key_name .. '_num_of_requests'
local count = redis.call('INCR', key)
if tonumber(count) > limit then
local time_left = redis.call('PTTL', key)
return time_left
end
redis.call('EXPIRE', key, duration)
return -2
'''
if not hasattr(r_db, 'over_limit_lua'):
r_db.over_limit_lua = r_db.register_script(over_limit_lua_)
request_possibility = int(r_db.over_limit_lua(keys=request_name, args=[limit, per]))
if request_possibility > 0:
time.sleep(request_possibility / 1000.0)
return limit_request(request_to_make, limit, per, request_name, **kwargs)
else:
request_result = request_to_make(**kwargs)
return request_result
Upvotes: 1