Reputation: 6960
Is it possible to return execution to event loop
from a function. And as soon Task
will be completed return to the function and continue execution?
Im trying to use pytest-asyncio
plugin
Example:
@pytest.mark.asyncio
async def test_async1(event_loop):
print('start 1')
res = event_loop.create_task(
send_async_request("http://test.com", limit=1000))) # here I need to return execution to event loop and continue only after getting response from send_async_request function
print('end1',res)
@pytest.mark.asyncio
async def test_async2(event_loop):
print('start 2')
res = event_loop.create_task(
send_async_request("http://test2", limit=1000))) # here I need to return execution to event loop and continue only after getting response from send_async_request function
print('end2', res)
send_async_request
- aiohttp
:
@asyncio.coroutine
def send_async_request(url, method='GET'):
with aiohttp.ClientSession() as session:
resp = yield from session.get(url, timeout=60)
if resp.status == 200:
return resp.status, response
else:
return resp.status, False
Upvotes: 8
Views: 12094
Reputation: 13423
When your tests are marked with pytest.mark.asyncio
, they become coroutines so you can use the await
syntax:
@pytest.mark.asyncio
async def test_sleep(event_loop):
result = await asyncio.sleep(1, result=3, loop=event_loop)
assert result == 3
EDIT: Another example, with multiple sleep
operations:
@pytest.mark.asyncio
async def test_multiple_sleep(event_loop):
tasks = [event_loop.create_task(asyncio.sleep(1, result=x))
for x in range(10)]
results = await asyncio.gather(*tasks)
assert results == list(range(10))
Upvotes: 7