Reputation: 4950
I want to test how asyncio
handle blocking processes.
There must be something wrong with my code because asyncio.TimeoutError
is never raised:
import asyncio, random, time
q = asyncio.Queue()
MAX_WAIT = 5
@asyncio.coroutine
def blocking_task(sec):
print('This task will sleep {} sec.'.format(sec))
time.sleep(sec)
@asyncio.coroutine
def produce():
while True:
q.put_nowait(random.randint(1,10))
yield from asyncio.sleep(0.5 + random.random())
@asyncio.coroutine
def consume():
while True:
value = yield from q.get()
try:
yield from asyncio.wait_for(blocking_task(value), MAX_WAIT)
except asyncio.TimeoutError:
print('~/~ Job has been canceled !!')
else:
print('=/= Job has been done :]')
loop = asyncio.get_event_loop()
asyncio.ensure_future(produce())
asyncio.ensure_future(consume())
loop.run_forever()
This code produce the following output:
$ ./tst3.py
This task will sleep 2 sec.
=/= Job has been done :]
This task will sleep 1 sec.
=/= Job has been done :]
This task will sleep 7 sec.
=/= Job has been done :]
Upvotes: 1
Views: 411
Reputation: 3080
Use
asyncio.sleep
instead ofsleep
The TimeoutError of asyncio is different from buildin TimeoutError. That's why you can not use time.sleep to trigger this error. For triggering TimeoutError in asyncio.coroutine, you only can use timer which is implemented by asyncio module.
@asyncio.coroutine
def blocking_task(sec):
print('This task will sleep {} sec.'.format(sec))
yield from asyncio.sleep(sec)
Result
This task will sleep 10 sec.
~/~ Job has been canceled !!
This task will sleep 3 sec.
=/= Job has been done :]
This task will sleep 4 sec.
=/= Job has been done :]
This task will sleep 2 sec.
=/= Job has been done :]
This task will sleep 7 sec.
~/~ Job has been canceled !!
This task will sleep 2 sec.
=/= Job has been done :]
Upvotes: 2