Reputation: 3519
Suppose I have two functions that work like this:
@tornado.gen.coroutine
def f():
for i in range(4):
print("f", i)
yield tornado.gen.sleep(0.5)
@tornado.gen.coroutine
def g():
yield tornado.gen.sleep(1)
print("Let's raise RuntimeError")
raise RuntimeError
In general, function f
might contain endless loop and never return (e.g. it can process some queue).
What I want to do is to be able to interrupt it, at any time it yields.
The most obvious way doesn't work. Exception is only raised after function f
exits (if it's endless, it obviously never happens).
@tornado.gen.coroutine
def main():
try:
yield [f(), g()]
except Exception as e:
print("Caught", repr(e))
while True:
yield tornado.gen.sleep(10)
if __name__ == "__main__":
tornado.ioloop.IOLoop.instance().run_sync(main)
Output:
f 0
f 1
Let's raise RuntimeError
f 2
f 3
Traceback (most recent call last):
File "/tmp/test/lib/python3.4/site-packages/tornado/gen.py", line 812, in run
yielded = self.gen.send(value)
StopIteration
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
<...>
File "test.py", line 16, in g
raise RuntimeError
RuntimeError
That is, exception is only raised when both of the coroutines return (both futures resolve).
This's partially solved by tornado.gen.WaitIterator
, but it's buggy (unless I'm mistaken). But that's not the point.
It still doesn't solve the problem of interrupting existing coroutines. Coroutine continues to run even though the function that started it exits.
EDIT: it seems like coroutine cancellation is something not really supported in Tornado, unlike in Python's asyncio, where you can easily throw CancelledError
at every yield point.
Upvotes: 2
Views: 2246
Reputation: 626
Warning: This is not a working solution. Look at the commentary. Still if you're new (as myself), this example can show the logical flow. Thanks @nathaniel-j-smith and @wgh
What is the difference using something more primitive, like global variable for instance?
import asyncio
event = asyncio.Event()
aflag = False
async def short():
while not aflag:
print('short repeat')
await asyncio.sleep(1)
print('short end')
async def long():
global aflag
print('LONG START')
await asyncio.sleep(3)
aflag = True
print('LONG END')
async def main():
await asyncio.gather(long(), short())
if __name__ == '__main__':
asyncio.run(main())
It is for asyncio, but I guess the idea stays the same. This is a semi-question (why Event would be better?). Yet solution yields exact result author needs:
LONG START
short repeat
short repeat
short repeat
LONG END
short end
UPDATE: this slides may be really helpful in understanding core of a problem.
Upvotes: -1
Reputation: 25263
Since version 5, Tornado runs on asyncio
event loop.
On Python 3, the
IOLoop
is always a wrapper around theasyncio
event loop, andasyncio.Future
andasyncio.Task
are used instead of their Tornado counterparts.
Hence you can use asyncio
Task cancellation, i.e. asyncio.Task.cancel
.
Your example with a queue reading while-true loop, might look like this.
import logging
from asyncio import CancelledError
from tornado import ioloop, gen
async def read_off_a_queue():
while True:
try:
await gen.sleep(1)
except CancelledError:
logging.debug('Reader cancelled')
break
else:
logging.debug('Pretend a task is consumed')
async def do_some_work():
await gen.sleep(5)
logging.debug('do_some_work is raising')
raise RuntimeError
async def main():
logging.debug('Starting queue reader in background')
reader_task = gen.convert_yielded(read_off_a_queue())
try:
await do_some_work()
except RuntimeError:
logging.debug('do_some_work failed, cancelling reader')
reader_task.cancel()
# give the task a chance to clean up, in case it
# catches CancelledError and awaits something
try:
await reader_task
except CancelledError:
pass
if __name__ == '__main__':
logging.basicConfig(level='DEBUG')
ioloop.IOLoop.instance().run_sync(main)
If you run it, you should see:
DEBUG:asyncio:Using selector: EpollSelector
DEBUG:root:Starting queue reader in background
DEBUG:root:Pretend a task is consumed
DEBUG:root:Pretend a task is consumed
DEBUG:root:Pretend a task is consumed
DEBUG:root:Pretend a task is consumed
DEBUG:root:do_some_work is raising
DEBUG:root:do_some_work failed, cancelling reader
DEBUG:root:Reader cancelled
Upvotes: 1
Reputation: 24007
If you use WaitIterator according to the instructions, and use a toro.Event to signal between coroutines, it works as expected:
from datetime import timedelta
import tornado.gen
import tornado.ioloop
import toro
stop = toro.Event()
@tornado.gen.coroutine
def f():
for i in range(4):
print("f", i)
# wait raises Timeout if not set before the deadline.
try:
yield stop.wait(timedelta(seconds=0.5))
print("f done")
return
except toro.Timeout:
print("f continuing")
@tornado.gen.coroutine
def g():
yield tornado.gen.sleep(1)
print("Let's raise RuntimeError")
raise RuntimeError
@tornado.gen.coroutine
def main():
wait_iterator = tornado.gen.WaitIterator(f(), g())
while not wait_iterator.done():
try:
result = yield wait_iterator.next()
except Exception as e:
print("Error {} from {}".format(e, wait_iterator.current_future))
stop.set()
else:
print("Result {} received from {} at {}".format(
result, wait_iterator.current_future,
wait_iterator.current_index))
if __name__ == "__main__":
tornado.ioloop.IOLoop.instance().run_sync(main)
For now, pip install toro
to get the Event class. Tornado 4.2 will include Event, see the changelog.
Upvotes: 5