Reputation: 107
I m tryin to make async API calls this way:
async def get_data(client, postdata): res = await client.post(url=_url, headers=_headers, data=postdata) return res
async def parse_res(client, postdata): res = await get_data(client, postdata) if bool(json.loads(res.text)['suggestions']): _oks = <...grab some JSON fields...> else: _oks = {} return _oks
async def main(_jobs): async with httpx.AsyncClient() as client: batch = [] calls = [] for job in _jobs: _postdata = '{ "query": "'+ job + '" }' calls.append(asyncio.create_task(parse_res(client, _postdata))) batch = await asyncio.gather(*calls) return batch
and then just run MAIN()
But the API can handle about 30-50 fast (nearly simultaneous requests or throws 429 HTTP error).
So i need to send batches of 30 calls and process 10 000 requests in chunks.
How do i process 10 000 (ten thousand) API calls in batches of 30 ?
Upvotes: 4
Views: 7526
Reputation: 15689
You could use Simon Hawe's answer, however here's a different approach without the usage of external libraries
Use asyncio.Semaphore
to limit the amount of calls made concurrently, when the semaphore is released it will let another function to run.
import asyncio
sem = asyncio.Semaphore(30) # no. of simultaneous requests
async def get_data(client, postdata):
async with sem:
res = client.post(url=_url, headers=_headers, data=postdata)
return res
async def parse_res(client, postdata):
res = await get_data(client, postdata)
if bool(json.loads(res.text)['suggestions']):
_oks = <...grab some JSON fields...>
else:
_oks = {}
return _oks
async def main(_jobs: int):
async with httpx.AsyncClient() as client:
postdata = '{"query": "' + job + '"}'
calls = [
asyncio.create_task(parse_res(client, postdata)
for _ in range(_jobs)
]
return await asyncio.gather(*calls)
Upvotes: 6
Reputation: 4529
One library that comes in handy here is funcy. It offers various helper for working with sequences. One of that would be chunks. This allows you to split a sequence into chunks of equal size or fewer in the end if the totalsize does not divide.
from funcy import chunks
result = []
for job_chunk in chunks(30, _jobs):
calls = [parse_res(client, '{ "query": "'+ job + '" }') for job un job_chunk]
batch = await asyncio.gather(*calls)
result.extend(batch)
Upvotes: 2