Atihska
Atihska

Reputation: 5126

How can I schedule multiple tasks using asyncio constructs in a loop

My use case is to run some performance tests so I wanted to create an app where I run 1 task 4 times, compute the time average for that task, then run 2 tasks asynchronously, compute the average, then run 4 tasks asynchronously, compute the average, then 8 and so on.

However, I am not able to run like this. When I do, all tasks it seems have been executed before and I get wrong times.

I tried some hit and trial and with the below code now I get TypeError: An asyncio.Future, a coroutine or an awaitable is required sys:1: RuntimeWarning: coroutine 'go' was never awaited on line loop.run_until_complete(asyncio.wait(asyncio.ensure_future(some_tasks))) in run_tasks function.

Below is my code:

async def go(date):
    pool = await aiopg.create_pool("**db connection**")
    async with pool.acquire() as conn:
        async with conn.cursor() as cur:

            await cur.execute(""" some query """)
            time.sleep(1)

            ret = []
            async for row in cur:
                ret.append(row)


def date_range(date1, date2):
    for n in range(int((date2 - date1).days)+1):
        yield date1 + timedelta(n)


def run_tasks():

    start_dt = datetime(2017, 8, 9)
    end_dt = datetime(2017, 8, 10)

    tasks = []
    some_tasks = []

    avg_time_run = []

    for dt in date_range(start_dt, end_dt):
        #tasks.append(asyncio.ensure_future(go(dt.strftime("%Y-%m-%d %H:%M:%S"))))
        tasks.append(go(dt.strftime("%Y-%m-%d %H:%M:%S")))

    i = 1
    prev = 0
    while i < 2: # i < 128

        # Get i number of tasks from task list
        for k in range(prev, i):
            some_tasks.append(tasks[k])

        prev = len(some_tasks)
        time_run = []
        for j in range(0, 4):  # repeat task 4 times
            start = time.time()
            loop = asyncio.get_event_loop()

            loop.run_until_complete(asyncio.wait(asyncio.ensure_future(some_tasks)))
            # loop.close()

            end = time.time()
            diff = end - start
            time_run.append(diff)
            print("ith SomeTask: {}, {}".format(i, some_tasks))
            print("Total time: {}".format(diff))

        # get average of each task run 4 times
        avg_time_run.append(sum(time_run) / float(len(time_run)))
        i *= 2

    return avg_time_run


print(run_tasks())    

Some hints will be appreciated. Where should I put await as it's there as asyncio.wait

Upvotes: 2

Views: 7516

Answers (2)

Atihska
Atihska

Reputation: 5126

ANSWER CODE:

async def run(date): // for adopt, check above go() function
    conn = await asyncpg.connect("db connections")
    values = await conn.fetch("""some query """)
    await asyncio.sleep(1)
    await conn.close()


def date_range(date1, date2):
    for n in range(int((date2 - date1).days)+1):
        yield date1 + timedelta(n)


def run_tasks():

    start_dt = datetime(2017, 8, 9)
    end_dt = datetime(2017, 8, 10)

    tasks = []

    avg_time_run = []

    i = 1

    while i < 9:  # num of tasks incremented
        time_run = []

        start = time.time()
        loop = asyncio.get_event_loop()

        for dt in date_range(start_dt, end_dt):
            if len(tasks) < i:
                print(dt)
                tasks.append(asyncio.ensure_future(run(dt.strftime("%Y-%m-%d %H:%M:%S"))))

                if len(tasks) == i:

                    for j in range(0, 4):  # repeat task 4 times
                        print("J counter: {}".format(j))

                        loop.run_until_complete(asyncio.wait(tasks))

                        end = time.time()
                        diff = end - start
                        time_run.append(diff)
                        print("Num of Tasks executing: {}, {}".format(i, tasks))
                        print("Task len: {}".format(len(tasks)))
                        print("Total time: {}".format(diff))

        # get average of each task run 4 times
        avg_time_run.append(sum(time_run) / float(len(time_run)))
        start_dt = end_dt + timedelta(days=1)
        end_dt = end_dt + timedelta(days=(i * 2 - i))
        i *= 2

        print(start_dt)
        print(end_dt)
        #loop.close()

    return avg_time_run


print(run_tasks())

Upvotes: 0

Mikhail Gerasimov
Mikhail Gerasimov

Reputation: 39536

asyncio.ensure_future(some_tasks)

You're passing list of coroutines to asyncio.ensure_future. As you can see in documentation this is not how this function works: you should pass single coroutine to create asyncio.Task. This is why you're getting TypeError, you're getting RuntimeWarning then since created go coroutines weren't awaited as result of all above.

You don't need asyncio.Task in this case at all, just pass list of coroutines to asyncio.wait:

loop.run_until_complete(asyncio.wait(some_tasks))

One more important thing:

time.sleep(1)

You should never do it inside coroutines: it freezes your event loop (and all coroutines everywhere with it). Please read this answer to read about how asyncio works in general.

If you want to sleep some time inside coroutines use asyncio.sleep:

await asyncio.sleep(1)

Upvotes: 3

Related Questions