Mitar
Mitar

Reputation: 6980

Combining multiple API requests into one when using Python aiohttp

I have an API endpoint which can take one or more object IDs and return responses for them, e.g., http://example.com/api/metadata?id=1&id=2&id=3. API endpoint is rate limited per call and not per ID, so it is better to call the API endpoint with many IDs.

On the other hand, I have existing code which tried to obtain metadata per ID, like:

async def get_metadata(object_id):
    response = await session.get(f"http://example.com/api/metadata?id={object_id}")
    response.raise_for_status()
    return (await response.json())['results'][object_id]

I would like to keep the signature of this function the same but change it so that it does not do individual requests, but blocks until a) 50 IDs are ready to be fetched b) some timeout like 10 seconds occurs inside which some but not 50 IDs are ready to be fetched. Then one API request is made, and then each (blocked) call to get_metadata returns the corresponding result. So external behavior of get_metadata should stay the same.

I tried few things with using semaphore or queues, but I got stuck. So what would be a good approach to implement this?

Upvotes: 3

Views: 2108

Answers (2)

dumbass
dumbass

Reputation: 27205

After a while of tinkering, I came up with this:

import abc, asyncio

class Batcher(metaclass=abc.ABCMeta):
    def __init__(self, *, max_batch, timeout):
        """
        Constructs a new Batcher. The parameter max_batch specifies
        the queue capacity, while timeout is the deadline after which
        a queue will be processed whether it’s at capacity or not.
        """
        self.__batch = None
        self.__event = None
        self.__timeout = timeout
        self.__maxsize = max_batch

    async def __wait(self, event, batch):
        try:
            await asyncio.wait_for(event.wait(), timeout=self.__timeout)
        except asyncio.TimeoutError:
            self.__event = None
        await self.__run(self.__batch)

    async def __run(self, batch):
        self.__batch = None
        try:
            await self._process(batch)
        except Exception as e:
            for _, future in batch:
                if future.done():
                    continue
                future.set_exception(e)
        else:
            for _, future in batch:
                if future.done():
                    continue
                future.set_result(None)

    def _setup(self):
        """
        Initialises a new batch.
        """
        if self.__event is not None:
            return
        self.__batch = []
        self.__event = asyncio.Event()
        asyncio.create_task(self.__wait(self.__event, self.__batch))

    def _finish(self):
        """
        Marks the current batch as complete and starts processing it.
        """
        self.__batch = None
        self.__event.set()
        self.__event = None 

    def _enqueue(self, item):
        """
        Adds an item to be processed in the next batch.

        Returns: an awaitable that will return the result of processing
        when awaited.
        """
        self._setup()
        future = asyncio.Future()
        self.__batch.append((item, future))
        if len(self.__batch) >= self.__maxsize:
            self._finish()
        return future

    @abc.abstractmethod
    async def _process(self, batch):
        """
        Processes the current batch. The batch parameter contains a list
        of pairs (item, future), where item is the value passed to _enqueue,
        while future is an asyncio.Future. Call the .set_result and/or
        .set_exception methods on the latter to return a result to the
        caller; if you don’t assign a result yourself, the returned value
        will be None.
        """
        raise NotImplementedError

You can subclass Batcher to create a façade around _enqueue that will validate the arguments and prepare them for processing.

Example:

import urllib

def singleton(*args, **kwargs):
    def wrapper(cls):
        return cls(*args, **kwargs)
    return wrapper

@singleton(max_batch=50, timeout=10)
class get_metadata(Batcher):
    async def _process(self, batch):
        qs = "&".join(
            f"id={urllib.parse.quote(str(object_id))}"
            for object_id, _ in batch
        )

        response = await session.get("http://example.com/api/metadata?" + qs)
        response.raise_for_status()

        results = (await response.json())['results']
        for object_id, future in batch:
            try:
                future.set_result(results[object_id])
            except Exception as e:
                future.set_exception(e)

    async def __call__(self, object_id):
        if not isinstance(object_id, int):
            raise ValueError(object_id)
        return await self._enqueue(object_id)

Here, get_metadata is a class instance, but you can invoke it as you would do with a normal function, thanks to the __call__ special method.

Upvotes: 1

user4815162342
user4815162342

Reputation: 154876

Semaphores won't work here because they work in reverse from what you need: they don't block until a certain number of coroutines acquire them. You need an asyncio equivalent of barrier, which unfortunately doesn't exist in the standard library.

Fortunately it's not hard to implement a barrier using events and a list. You could do it like this (only vaguely tested):

_waiters = []
_have_new_waiter = None

async def get_metadata(session, object_id):
    global _have_new_waiter
    if _have_new_waiter is None:
        _have_new_waiter = asyncio.Event()
        asyncio.create_task(_monitor_incoming(session))

    future = asyncio.get_event_loop().create_future()
    _waiters.append((object_id, future))
    _have_new_waiter.set()
    return await future

async def _monitor_incoming(session):
    while True:
        timeout = False
        try:
            await asyncio.wait_for(_have_new_waiter.wait(), 10)
        except asyncio.TimeoutError:
            timeout = True
        _have_new_waiter.clear()
        if len(_waiters) == 0 or len(_waiters) < 50 and not timeout:
            continue
        lst = _waiters[:]
        del _waiters[:]
        asyncio.create_task(_get_batch(session, lst))

async def _get_batch(session, waiter_lst):
    object_ids = [object_id for (object_id, _future) in waiter_lst]
    try:
        async with session.get(
                f"http://example.com/api/metadata?ids={'&'.join(map(str, object_ids))}"):
            response.raise_for_status()
            dct = response.json()['results']
    except Exception as e:
        for result, (_object_id, future) in zip(results, waiter_lst):
            future.set_exception(e)
        return
    results = [results[object_id] for object_id in object_ids]
    for result, (_object_id, future) in zip(results, waiter_lst):
        future.set_result(result)

Upvotes: 1

Related Questions