mac13k
mac13k

Reputation: 2663

How to retry async requests upon ClientOSError: [Errno 104] Connection reset by peer?

I have a function in Google Cloud that accepts a number of parameters. I generate ~2k asynchronous requests with different combinations of parameter values using aiohttp:

# url = 'https://...'
# headers = {'X-Header': 'value'}

timeout = aiohttp.ClientTimeout(total=72000000)

async def submit_bt(session, url, payload):
        async with session.post(url, json=payload) as resp:
                result = await resp.text()

async def main():
        async with aiohttp.ClientSession(headers=headers, timeout=timeout) as session:
                tasks = []
                gen = payload_generator() # a class that generates dictionaries
                for payload in gen.param_grid():
                        tasks.append(asyncio.ensure_future(submit_bt(session, url, payload)))

                bt_results = await asyncio.gather(*tasks)
                for result in bt_results:
                        pass

asyncio.run(main())

A function takes between 3 to 6 minutes to run, function timeout is set to 9 minutes and maximum number of instances to 3000, but I never see more than 150-200 instances being initiated even when the total number of submitted requests is between 1.5k and 2.5k. On some occasions all requests are processed in 20 to 30 minutes, but sometimes I get an error on the client side:

ClientOSError: [Errno 104] Connection reset by peer

that does not correspond to any errors on the server side. I think I should be able to catch it as an aiohttp.client_exceptions.ClientOSError exception, but I am not sure how to handle it in the asynchronous settings, so that the failed request is resubmitted and the premature termination is avoided. Any hints are greatly appreciated.

Upvotes: 4

Views: 5666

Answers (2)

mac13k
mac13k

Reputation: 2663

The solution suggested by @vaizki in the comment seems to be working well for me. After a closer look at the traceback it turned out that the exception was raised in the submit_bt co-routine, so I added the try-except clause:

async def submit_bt(session, url, payload):
        try:
                async with session.post(url, json=payload) as resp:
                        result = await resp.text()
        except aiohttp.client_exceptions.ClientOSError as e:
                await asyncio.sleep(3 + random.randint(0, 9))
                async with session.post(url, json=payload) as resp:
                        result = await resp.text()
        except Exception as e:
                result = str(e)
        return result

It does not look very elegant with the repeated lines, but this is still a work in progress for me and the code structure is not formalized at this stage. Anyway it is clear to see what I wanted to achieve:

  • post the payload to the function URL,
  • catch exceptions, but repeat the post only in case of ClientOSError and only once.

I did not want to go with a while True kind of loop to avoid infinite execution in case of some serious issues. I tried this code as is a couple of times and I know it worked through a couple of connection resets until the end of the task list, because I got all the results generated by the function, so even in this form it is robust enough for my circumstances.

Upvotes: 5

Rajeev Tirumalasetty
Rajeev Tirumalasetty

Reputation: 352

You can check this thread which is a similar issue. Based on the answer from this thread:

Cloud functions are stateless, but can re-use global state from previous invocations. This is explained in tips and these docs.

Using global state with retries should give you a more robust function:

You can import the following library and can use your cloud function inside the @retry method.

from tenacity import retry, stop_after_attempt, wait_random
@retry(stop=stop_after_attempt(3), wait=wait_random(min=1, max=2))
def function():

Upvotes: 0

Related Questions