Reputation: 5126
My use case is to run some performance tests so I wanted to create an app where I run 1 task 4 times, compute the time average for that task, then run 2 tasks asynchronously, compute the average, then run 4 tasks asynchronously, compute the average, then 8 and so on.
However, I am not able to run like this. When I do, all tasks it seems have been executed before and I get wrong times.
I tried some hit and trial and with the below code now I get TypeError: An asyncio.Future, a coroutine or an awaitable is required
sys:1: RuntimeWarning: coroutine 'go' was never awaited
on line loop.run_until_complete(asyncio.wait(asyncio.ensure_future(some_tasks)))
in run_tasks
function.
Below is my code:
async def go(date):
pool = await aiopg.create_pool("**db connection**")
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute(""" some query """)
time.sleep(1)
ret = []
async for row in cur:
ret.append(row)
def date_range(date1, date2):
for n in range(int((date2 - date1).days)+1):
yield date1 + timedelta(n)
def run_tasks():
start_dt = datetime(2017, 8, 9)
end_dt = datetime(2017, 8, 10)
tasks = []
some_tasks = []
avg_time_run = []
for dt in date_range(start_dt, end_dt):
#tasks.append(asyncio.ensure_future(go(dt.strftime("%Y-%m-%d %H:%M:%S"))))
tasks.append(go(dt.strftime("%Y-%m-%d %H:%M:%S")))
i = 1
prev = 0
while i < 2: # i < 128
# Get i number of tasks from task list
for k in range(prev, i):
some_tasks.append(tasks[k])
prev = len(some_tasks)
time_run = []
for j in range(0, 4): # repeat task 4 times
start = time.time()
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(asyncio.ensure_future(some_tasks)))
# loop.close()
end = time.time()
diff = end - start
time_run.append(diff)
print("ith SomeTask: {}, {}".format(i, some_tasks))
print("Total time: {}".format(diff))
# get average of each task run 4 times
avg_time_run.append(sum(time_run) / float(len(time_run)))
i *= 2
return avg_time_run
print(run_tasks())
Some hints will be appreciated. Where should I put await as it's there as asyncio.wait
Upvotes: 2
Views: 7516
Reputation: 5126
ANSWER CODE:
async def run(date): // for adopt, check above go() function
conn = await asyncpg.connect("db connections")
values = await conn.fetch("""some query """)
await asyncio.sleep(1)
await conn.close()
def date_range(date1, date2):
for n in range(int((date2 - date1).days)+1):
yield date1 + timedelta(n)
def run_tasks():
start_dt = datetime(2017, 8, 9)
end_dt = datetime(2017, 8, 10)
tasks = []
avg_time_run = []
i = 1
while i < 9: # num of tasks incremented
time_run = []
start = time.time()
loop = asyncio.get_event_loop()
for dt in date_range(start_dt, end_dt):
if len(tasks) < i:
print(dt)
tasks.append(asyncio.ensure_future(run(dt.strftime("%Y-%m-%d %H:%M:%S"))))
if len(tasks) == i:
for j in range(0, 4): # repeat task 4 times
print("J counter: {}".format(j))
loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
diff = end - start
time_run.append(diff)
print("Num of Tasks executing: {}, {}".format(i, tasks))
print("Task len: {}".format(len(tasks)))
print("Total time: {}".format(diff))
# get average of each task run 4 times
avg_time_run.append(sum(time_run) / float(len(time_run)))
start_dt = end_dt + timedelta(days=1)
end_dt = end_dt + timedelta(days=(i * 2 - i))
i *= 2
print(start_dt)
print(end_dt)
#loop.close()
return avg_time_run
print(run_tasks())
Upvotes: 0
Reputation: 39536
asyncio.ensure_future(some_tasks)
You're passing list of coroutines to asyncio.ensure_future
. As you can see in documentation this is not how this function works: you should pass single coroutine to create asyncio.Task. This is why you're getting TypeError
, you're getting RuntimeWarning
then since created go
coroutines weren't awaited as result of all above.
You don't need asyncio.Task
in this case at all, just pass list of coroutines to asyncio.wait
:
loop.run_until_complete(asyncio.wait(some_tasks))
One more important thing:
time.sleep(1)
You should never do it inside coroutines: it freezes your event loop (and all coroutines everywhere with it). Please read this answer to read about how asyncio works in general.
If you want to sleep some time inside coroutines use asyncio.sleep:
await asyncio.sleep(1)
Upvotes: 3