tavor999
tavor999

Reputation: 477

Asyncio is not working within my Python3.7 lambda

I am trying to create a python3.7 lambda which correctly uses asyncio for threading.

I have tried many different code variations but here is the latest block. I am using AWS Xray to look at the timing and it is easy to verify that the async is not working correctly. All these tasks and calls are being done synchronously.

import json
import boto3
import asyncio
from botocore.exceptions import ClientError
from aws_xray_sdk.core import xray_recorder
from aws_xray_sdk.core import patch_all

#xray
patch_all()

def lambda_handler(event, context):  
    tasks = []
    dict_to_populate = {}

    for item in list:
        tasks.append(asyncio.ensure_future(do_work(item, dict_to_populate)))

    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.gather(*tasks))
    loop.close


async def do_work(item, dict_to_populate):
    #assume regions are obtained
    for region in regions:
        response_vpcs = describe_vpcs(obj['Id'], session_assumed, region)

        if 'Vpcs' in response_vpcs:
            for vpc in response_vpcs['Vpcs']:
                #process 

I expect to see the do_work functions started at essentially the same time (asynchronously) but they are all synchronous according to XRAY. It is processing synchronously and is populating dict_to_populate as expected.

Upvotes: 0

Views: 918

Answers (1)

amittn
amittn

Reputation: 2355

  • This is how i have done in my aws lambda, I wanted to do 4 post request and then collect all the responses. Hope this helps.
    loop = asyncio.get_event_loop()

    if loop.is_closed():
        loop = asyncio.new_event_loop()

    #The perform_traces method i do all the post method
    task = loop.create_task(perform_traces(payloads, message, contact_centre))
    unique_match, error = loop.run_until_complete(task)
    loop.close()

In the perform_trace method this is how i have used wait with session

    future_dds_responses = []

    async with aiohttp.ClientSession() as session:

        for payload in payloads:
            future_dds_responses.append(dds_async_trace(session, payload, contact_centre))

        dds_responses, pending = await asyncio.wait(future_dds_responses)

In dds_async_trace this is how i have done the post using the aiohttp.ClientSession session

        async with session.post(pds_url,
                                data=populated_template_payload,
                                headers=PDS_HEADERS,
                                ssl=ssl_context) as response:
            status_code = response.status

Upvotes: 1

Related Questions