Reputation: 6980
I have an API endpoint which can take one or more object IDs and return responses for them, e.g., http://example.com/api/metadata?id=1&id=2&id=3
. API endpoint is rate limited per call and not per ID, so it is better to call the API endpoint with many IDs.
On the other hand, I have existing code which tried to obtain metadata per ID, like:
async def get_metadata(object_id):
response = await session.get(f"http://example.com/api/metadata?id={object_id}")
response.raise_for_status()
return (await response.json())['results'][object_id]
I would like to keep the signature of this function the same but change it so that it does not do individual requests, but blocks until a) 50 IDs are ready to be fetched b) some timeout like 10 seconds occurs inside which some but not 50 IDs are ready to be fetched. Then one API request is made, and then each (blocked) call to get_metadata
returns the corresponding result. So external behavior of get_metadata
should stay the same.
I tried few things with using semaphore or queues, but I got stuck. So what would be a good approach to implement this?
Upvotes: 3
Views: 2108
Reputation: 27205
After a while of tinkering, I came up with this:
import abc, asyncio
class Batcher(metaclass=abc.ABCMeta):
def __init__(self, *, max_batch, timeout):
"""
Constructs a new Batcher. The parameter max_batch specifies
the queue capacity, while timeout is the deadline after which
a queue will be processed whether it’s at capacity or not.
"""
self.__batch = None
self.__event = None
self.__timeout = timeout
self.__maxsize = max_batch
async def __wait(self, event, batch):
try:
await asyncio.wait_for(event.wait(), timeout=self.__timeout)
except asyncio.TimeoutError:
self.__event = None
await self.__run(self.__batch)
async def __run(self, batch):
self.__batch = None
try:
await self._process(batch)
except Exception as e:
for _, future in batch:
if future.done():
continue
future.set_exception(e)
else:
for _, future in batch:
if future.done():
continue
future.set_result(None)
def _setup(self):
"""
Initialises a new batch.
"""
if self.__event is not None:
return
self.__batch = []
self.__event = asyncio.Event()
asyncio.create_task(self.__wait(self.__event, self.__batch))
def _finish(self):
"""
Marks the current batch as complete and starts processing it.
"""
self.__batch = None
self.__event.set()
self.__event = None
def _enqueue(self, item):
"""
Adds an item to be processed in the next batch.
Returns: an awaitable that will return the result of processing
when awaited.
"""
self._setup()
future = asyncio.Future()
self.__batch.append((item, future))
if len(self.__batch) >= self.__maxsize:
self._finish()
return future
@abc.abstractmethod
async def _process(self, batch):
"""
Processes the current batch. The batch parameter contains a list
of pairs (item, future), where item is the value passed to _enqueue,
while future is an asyncio.Future. Call the .set_result and/or
.set_exception methods on the latter to return a result to the
caller; if you don’t assign a result yourself, the returned value
will be None.
"""
raise NotImplementedError
You can subclass Batcher
to create a façade around _enqueue
that will validate the arguments and prepare them for processing.
Example:
import urllib
def singleton(*args, **kwargs):
def wrapper(cls):
return cls(*args, **kwargs)
return wrapper
@singleton(max_batch=50, timeout=10)
class get_metadata(Batcher):
async def _process(self, batch):
qs = "&".join(
f"id={urllib.parse.quote(str(object_id))}"
for object_id, _ in batch
)
response = await session.get("http://example.com/api/metadata?" + qs)
response.raise_for_status()
results = (await response.json())['results']
for object_id, future in batch:
try:
future.set_result(results[object_id])
except Exception as e:
future.set_exception(e)
async def __call__(self, object_id):
if not isinstance(object_id, int):
raise ValueError(object_id)
return await self._enqueue(object_id)
Here, get_metadata
is a class instance, but you can invoke it as you would do with a normal function, thanks to the __call__
special method.
Upvotes: 1
Reputation: 154876
Semaphores won't work here because they work in reverse from what you need: they don't block until a certain number of coroutines acquire them. You need an asyncio equivalent of barrier, which unfortunately doesn't exist in the standard library.
Fortunately it's not hard to implement a barrier using events and a list. You could do it like this (only vaguely tested):
_waiters = []
_have_new_waiter = None
async def get_metadata(session, object_id):
global _have_new_waiter
if _have_new_waiter is None:
_have_new_waiter = asyncio.Event()
asyncio.create_task(_monitor_incoming(session))
future = asyncio.get_event_loop().create_future()
_waiters.append((object_id, future))
_have_new_waiter.set()
return await future
async def _monitor_incoming(session):
while True:
timeout = False
try:
await asyncio.wait_for(_have_new_waiter.wait(), 10)
except asyncio.TimeoutError:
timeout = True
_have_new_waiter.clear()
if len(_waiters) == 0 or len(_waiters) < 50 and not timeout:
continue
lst = _waiters[:]
del _waiters[:]
asyncio.create_task(_get_batch(session, lst))
async def _get_batch(session, waiter_lst):
object_ids = [object_id for (object_id, _future) in waiter_lst]
try:
async with session.get(
f"http://example.com/api/metadata?ids={'&'.join(map(str, object_ids))}"):
response.raise_for_status()
dct = response.json()['results']
except Exception as e:
for result, (_object_id, future) in zip(results, waiter_lst):
future.set_exception(e)
return
results = [results[object_id] for object_id in object_ids]
for result, (_object_id, future) in zip(results, waiter_lst):
future.set_result(result)
Upvotes: 1