Roy King
Roy King

Reputation: 33

How can I recall "task" if task is failed?

There's the function make_request that makes http request to the API. And I can't make more than 3 requests per second.

I did something like that

coroutines = [make_request(...) for ... in ...]
tasks = []
for coroutine in coroutines:
   tasks.append(asyncio.create_task(coroutine))
   await asyncio.sleep(1 / 3)
responses = asyncio.gather(*tasks)

But also I can't make more than 1000 requests per hour. (Probably, I could make delay 3600 / 1000.) And what if the internet connection will be lost? I should try to make request again.

I can wrap make_request like that:

async def try_make_request(...):
   while True:
      try:
         return await make_request(...)
      exception APIError as err:
         logging.exception(...)

In this case, it's possible that more than 3 requests per second will be made.

I found that solution but I'm not sure that this is the best solution

pending = []
coroutines = [...]
for coroutine in coroutines:
    pending.append(asyncio.create_task(coroutine))
    await asyncio.sleep(1 / 3)
result = []
while True:
    finished, pending = await asyncio.wait(
        pending, return_when=asyncio.FIRST_EXCEPTION
    )
    for task in finished:
        exc = task.exception()
        if isinstance(exc, APIError) and exc.code == 29:
            pending.add(task.get_coro()) # since python 3.8
        if exc:
            logging.exception(...)
        else:
            result.append(task.result())
    if not pending:
        break

Upvotes: 3

Views: 739

Answers (1)

user4815162342
user4815162342

Reputation: 155296

If I understand the requirement correctly, you must not initiate connections more often than 3.6 seconds apart. One way to achieve that is to have a timer that gets reset each time a connection is initiated, and which expires 3.6s later, allowing the next connection to be initiated. For example:

class Limiter:
    def __init__(self, delay):
        self.delay = delay
        self._ready = asyncio.Event()
        self._ready.set()

    async def wait(self):
        # wait in a loop because if there are multiple waiters,
        # the wakeup can be spurious
        while not self._ready.is_set():
            await self._ready.wait()
        # We got the slot and can proceed with the download.
        # Before doing so, clear the ready flag to prevent other waiters
        # from commencing downloads until the delay elapses again.
        self._ready.clear()
        asyncio.get_event_loop().call_later(self.delay, self._ready.set)

Then try_make_request could look like this:

async def try_make_request(limiter, ...):
    while True:
        await limiter.wait()
        try:
            return await make_request(...)
        exception APIError as err:
            logging.exception(...)

...and the main coroutine can await try_make_requests all in parallel:

limiter = Limiter(3600/1000)
responses = await asyncio.gather(*[try_make_request(limiter, ...) for ... in ...])

Upvotes: 1

Related Questions