metersk
metersk

Reputation: 12529

Python 3.6 async GET requests in with aiohttp are running synchronously

I have the below functioning properly, but for some reason the requests seem to be executing synchronously, instead of asynchronously.

My assumption now is that this is happening because of the for record in records for loop in the main function, but i'm not sure how to change this so that requests can execute async. If this is not the case, what else would I need to change?

async def do_request(query_string):
        base_url = 'https://maps.googleapis.com/maps/api/place/textsearch/json?'
        params = {'key': google_api_key,
                  'query': query_string}
        async with aiohttp.ClientSession() as session:
            async with session.request('GET', base_url, params=params) as resp:
                return resp


async def main():
    create_database_and_tables()
    records = prep_sample_data()[:100]

    for record in records:
        r = Record(record)

        if not r.is_valid:
            continue

        query_string = r.generate_query_string()

        resp = await do_request(query_string)
        print("NOW WRITE TO DATABASE")

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Upvotes: 4

Views: 2669

Answers (2)

NinjaKitty
NinjaKitty

Reputation: 638

Building off of Martijn's Answer

If order of the requests don't matter to you too much (when it gets written to the database), you could write the responses to your database while fetching commands.

Edit (to explain more): I use 2 semaphores here. 1 is to limit the number of connections through aiohttp. This will depend on your system. Most linux systems default to 1024. In my own personal experience, setting it lower than your OS max is preferable.

max_coroutines is to solve the problem of having too many coroutines ran at once.

I use asyncio.ensure_future() so that we run the coroutines as we build the list. This way, you're not creating the full list of coroutines before executing any of them.

# Limit the total number of requests you make by 512 open connections.
max_request_semaphore = asyncio.BoundedSemaphore(512)
max_coroutines = asyncio.BoundedSemaphore(10000)


async def process_response(response):
    print('Process your response to your database')


async def do_request(query_string):
    base_url = 'https://maps.googleapis.com/maps/api/place/textsearch/json?'
    params = {'key': google_api_key,
              'query': query_string}
    async with max_request_semaphore:
        async with aiohttp.ClientSession() as session:
            async with session.request('GET', base_url, params=params) as resp:
                return resp


# Excuse me for the bad function naming
async do_full_request(query_string):
    resp = await do_request(query_string)
    await process_response(resp)
    max_coroutines.release()

async def main():
    create_database_and_tables()
    records = prep_sample_data()[:100]

    requests = []
    for record in records:
        r = Record(record)

        if not r.is_valid:
            continue

        query_string = r.generate_query_string()

        # Will prevent more than 10k coroutines created.
        await max_coroutines.acquire()
        requests.append(
            asyncio.ensure_future(
                do_full_request(query_string)))

    # Now gather all the coroutines
    await asyncio.gather(*requests)


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Upvotes: 2

Martijn Pieters
Martijn Pieters

Reputation: 1124348

You are awaiting on separate do_request() calls in sequence. Instead of awaiting on them directly (which blocks until the coroutine is done), use the asyncio.gather() function to have the event loop run them concurrently:

async def main():
    create_database_and_tables()
    records = prep_sample_data()[:100]

    requests = []
    for record in records:
        r = Record(record)

        if not r.is_valid:
            continue

        query_string = r.generate_query_string()

        requests.append(do_request(query_string))

    for resp in asyncio.gather(*requests):
        print("NOW WRITE TO DATABASE")

The asyncio.gather() return value is a list of all the results the coroutines returned, in the same order you passed them to the gather() function.

If you needed the original records to process the responses, you can pair up record and query string in several different ways:

  • store valid records in a separate list and use zip() to pair them up again as you process the responses
  • use a helper coroutine that takes the valid record, produces a query string, invokes the request, and returns the record and response together as a tuple.

You can also mix in the response handling into a gathered coroutine; one that takes a record, produces the query string, awaits on do_request and then stores the result in the database when the response is ready.

In other words, divide up your work that needs to happen consecutively, in coroutines and gather those.

Upvotes: 6

Related Questions