Ciro
Ciro

Reputation: 107

Python async API requests in batches

I m tryin to make async API calls this way:

  1. func to send request:
async def get_data(client, postdata):
    res = await client.post(url=_url, headers=_headers, data=postdata)

    return res
  1. func to parse JSON:
async def parse_res(client, postdata):
    
    res = await get_data(client, postdata)
    
    if bool(json.loads(res.text)['suggestions']):
        _oks = <...grab some JSON fields...>
    else:
        _oks = {}
    return _oks
  1. I wrap this two funcs in MAIN():
async def main(_jobs):
    
    async with httpx.AsyncClient() as client:
        
        batch = []
        calls = []
    
        for job in _jobs:

            _postdata = '{ "query": "'+ job + '" }'

            calls.append(asyncio.create_task(parse_res(client, _postdata)))
            
        batch = await asyncio.gather(*calls)
            
        return batch

and then just run MAIN()

But the API can handle about 30-50 fast (nearly simultaneous requests or throws 429 HTTP error).

So i need to send batches of 30 calls and process 10 000 requests in chunks.

How do i process 10 000 (ten thousand) API calls in batches of 30 ?

Upvotes: 4

Views: 7526

Answers (2)

Łukasz Kwieciński
Łukasz Kwieciński

Reputation: 15689

You could use Simon Hawe's answer, however here's a different approach without the usage of external libraries

Use asyncio.Semaphore to limit the amount of calls made concurrently, when the semaphore is released it will let another function to run.

import asyncio

sem = asyncio.Semaphore(30)  # no. of simultaneous requests

async def get_data(client, postdata):
    async with sem:
        res = client.post(url=_url, headers=_headers, data=postdata)
    return res


async def parse_res(client, postdata):
    res = await get_data(client, postdata)
    if bool(json.loads(res.text)['suggestions']):
        _oks = <...grab some JSON fields...>
    else:
        _oks = {}
    return _oks


async def main(_jobs: int):
    async with httpx.AsyncClient() as client:
        postdata = '{"query": "' + job + '"}'
        calls = [
            asyncio.create_task(parse_res(client, postdata)
            for _ in range(_jobs)
        ]

        return await asyncio.gather(*calls)

Upvotes: 6

Simon Hawe
Simon Hawe

Reputation: 4529

One library that comes in handy here is funcy. It offers various helper for working with sequences. One of that would be chunks. This allows you to split a sequence into chunks of equal size or fewer in the end if the totalsize does not divide.

from funcy import chunks
result = []
for job_chunk in chunks(30, _jobs):
    calls = [parse_res(client, '{ "query": "'+ job + '" }') for job un job_chunk]
    batch = await asyncio.gather(*calls)
    result.extend(batch)

    

Upvotes: 2

Related Questions