Reputation: 2663
I have a function in Google Cloud that accepts a number of parameters. I generate ~2k asynchronous requests with different combinations of parameter values using aiohttp:
# url = 'https://...'
# headers = {'X-Header': 'value'}
timeout = aiohttp.ClientTimeout(total=72000000)
async def submit_bt(session, url, payload):
async with session.post(url, json=payload) as resp:
result = await resp.text()
async def main():
async with aiohttp.ClientSession(headers=headers, timeout=timeout) as session:
tasks = []
gen = payload_generator() # a class that generates dictionaries
for payload in gen.param_grid():
tasks.append(asyncio.ensure_future(submit_bt(session, url, payload)))
bt_results = await asyncio.gather(*tasks)
for result in bt_results:
pass
asyncio.run(main())
A function takes between 3 to 6 minutes to run, function timeout is set to 9 minutes and maximum number of instances to 3000, but I never see more than 150-200 instances being initiated even when the total number of submitted requests is between 1.5k and 2.5k. On some occasions all requests are processed in 20 to 30 minutes, but sometimes I get an error on the client side:
ClientOSError: [Errno 104] Connection reset by peer
that does not correspond to any errors on the server side. I think I should be able to catch it as an aiohttp.client_exceptions.ClientOSError
exception, but I am not sure how to handle it in the asynchronous settings, so that the failed request is resubmitted and the premature termination is avoided. Any hints are greatly appreciated.
Upvotes: 4
Views: 5666
Reputation: 2663
The solution suggested by @vaizki in the comment seems to be working well for me. After a closer look at the traceback it turned out that the exception was raised in the submit_bt
co-routine, so I added the try-except clause:
async def submit_bt(session, url, payload):
try:
async with session.post(url, json=payload) as resp:
result = await resp.text()
except aiohttp.client_exceptions.ClientOSError as e:
await asyncio.sleep(3 + random.randint(0, 9))
async with session.post(url, json=payload) as resp:
result = await resp.text()
except Exception as e:
result = str(e)
return result
It does not look very elegant with the repeated lines, but this is still a work in progress for me and the code structure is not formalized at this stage. Anyway it is clear to see what I wanted to achieve:
ClientOSError
and only once.I did not want to go with a while True
kind of loop to avoid infinite execution in case of some serious issues. I tried this code as is a couple of times and I know it worked through a couple of connection resets until the end of the task list, because I got all the results generated by the function, so even in this form it is robust enough for my circumstances.
Upvotes: 5
Reputation: 352
You can check this thread which is a similar issue. Based on the answer from this thread:
Cloud functions are stateless, but can re-use global state from previous invocations. This is explained in tips and these docs.
Using global state with retries should give you a more robust function:
You can import the following library and can use your cloud function inside the @retry method.
from tenacity import retry, stop_after_attempt, wait_random
@retry(stop=stop_after_attempt(3), wait=wait_random(min=1, max=2))
def function():
Upvotes: 0