Binx
Binx

Reputation: 414

Python Async/Await Race Condition with Monday.com

I've been working with the Monday.com API to create a new board with groups/items. A coworker recommended that I use the aysnc await functionality to help speed up the process since it was taking several hours to create ~7k items. I have successfully implemented the async await functionality, but have been faced with a new issue that I'm not very familiar with (race conditions).

From my understanding, Monday's database cannot refresh fast enough for the concurrent post requests that it receives. Sometimes the database is being overwriten with another request. So I may get a status 200 (OK), but in reality the item was overwriten and never actually created.

A simple solution would to add some time.sleep() actions to avoid this. However due to the data quantity, the processing time becomes undesirable. Hence the reason I implemented async await in the first place. I can also adjust the throttle (aiolimiter), but again, processing time....

I could set up a check loop that checks for the created group/item at the end and if it doesn't exist, run through the processes again. I'm a bit nervous to go down this path as I don't want the tool to turn into spaghetti code and infinite loops.

I am looking for other suggestions on how to handle race conditions.

from aiolimiter import AsyncLimiter  # https://github.com/mjpieters/aiolimiter -- https://stackoverflow.com/a/45502319/10859585
import asyncio
import aiohttp
import json

# allow for 100 concurrent entries within a 60 second window
rate_limit = AsyncLimiter(100, 60)
async def post_request(data, url=apiUrl, headers=headers, timeout=timeout):

    try:
        async with rate_limit:
            async with aiohttp.ClientSession() as session:
                async with session.post(url=url, headers=headers, data=json.dumps(data), timeout=timeout) as response:
                    r = await response.json()
                    try:
                        print(response.status_code, response.error_message)
                    except:
                        pass
                    print(r)
                    # Track the complexity limit
                    try:
                        after_complexity = r['data']['complexity']['after']
                        if after_complexity < 1000000:
                            print("WAITING FOR COMPLEXITY RATE TO RESET")
                            await asyncio.sleep(45)
                    except KeyError as e:
                        pass

                    # Track the group id's for items created. Used for updating items without
                    # needing to query the whole board to find where items go.
                    try:
                        g_id = data["variables"]["groupId"]
                        item_col_vals = data["variables"]["columnValues"]
                    except Exception as e:
                        g_id = None
                        item_col_vals = None

                    # response.raise_for_status()
                    return await response.json(), response.status, g_id, item_col_vals
                
    except aiohttp.ClientError as erraio:
        failed_requests.add_failed_request(data)
        print("Client Error:", erraio)
        return None
    except Exception as e:
        print(e)
        return None
def create_group_payload(board_id, group_name):
    payload = f"""
    mutation {{
        complexity {{
                query
                before
                after
            }}
        create_group (board_id: {board_id} group_name: {json.dumps(group_name)}) {{
            id
            title
        }}
    }}
    """
    data = {'query' : payload}

    return data
group_func_vars = zip(board_id_list, unique_projects)

# Parallelism ~9 sec for 29 groups
group_tasks = [asyncio.create_task(post_request(create_group_payload(*vars))) for vars in group_func_vars]   
r = await asyncio.gather(*group_tasks)

I read and implemented the asyncio.lock() functionality, but it seems that it turns the code back into a sequential process.

async def post_request(data, url=apiUrl, headers=headers, timeout=timeout):

    try:
        async with rate_limit:
            async with aiohttp.ClientSession() as session:
                async with asyncio.lock():
                    async with session.post(url=url, headers=headers, data=json.dumps(data), timeout=timeout) as response:
                        r = await response.json()

Upvotes: 0

Views: 117

Answers (0)

Related Questions