Reputation: 1217
I've implemented an HTTP wrapper of some kind of heavy task, and I chose Tornado as a front-end server framework (it's because the heavy task is written in Python, and I'm just used to Tornado).
Currently, I just called heavy tasks directly from Tornado's process. I prepared some kind of Web-based interface using jQuery, let it proceed AJAX request with parameters set in form.
As you may imagine, the task I've thrown from my web-browser isn't cancellable. The only way I can cancel is to send 9 or 15 signal to the Python process, and that's not what users can usually do.
I want to let the currently-working-task to be cancelled by requesting some kind of "cancel" requests via HTTP. How can it be done? What are most web services that handle heavy tasks (ex. video encoding in YouTube) doing?
Upvotes: 1
Views: 429
Reputation: 12587
Actually Tornado's Futures
do not support cancellation (docs). Moreover even using with_timeout
, the time-outed job is still running, only nothing waits for its results.
The only way, as stated also in How can I cancel a hanging asyncronous task in tornado, with a timeout?, is to implement logic in that way it could be cancelled (with some flag or whatever).
Example:
/
lists jobs/add/TIME
adds new job - TIME in seconds - specify how long to sleep/cancel/ID
cancel jobThe code might look like:
from tornado.ioloop import IOLoop
from tornado import gen, web
from time import time
class Job():
def __init__(self, run_sec):
self.run_sec = int(run_sec)
self.start_time = None
self.end_time = None
self._cancelled = False
@gen.coroutine
def run(self):
""" Some job
The job is simple: sleep for a given number of seconds.
It could be implemented as:
yield gen.sleep(self.run_sec)
but this way makes it not cancellable, so
it is divided: run 1s sleep, run_sec times
"""
self.start_time = time()
deadline = self.start_time + self.run_sec
while not self._cancelled:
yield gen.sleep(1)
if time() >= deadline:
break
self.end_time = time()
def cancel(self):
""" Cancels job
Returns None on success,
raises Exception on error:
if job is already cancelled or done
"""
if self._cancelled:
raise Exception('Job is already cancelled')
if self.end_time is not None:
raise Exception('Job is already done')
self._cancelled = True
def get_state(self):
if self._cancelled:
if self.end_time is None:
# job might be running still
# and will be stopped on the next while check
return 'CANCELING...'
else:
return 'CANCELLED'
elif self.end_time is None:
return 'RUNNING...'
elif self.start_time is None:
# actually this never will shown
# as after creation, job is immediately started
return 'NOT STARTED'
else:
return 'DONE'
class MainHandler(web.RequestHandler):
def get(self, op=None, param=None):
if op == 'add':
# add new job
new_job = Job(run_sec=param)
self.application.jobs.append(new_job)
new_job.run()
self.write('Job added')
elif op == 'cancel':
# cancel job - stop running
self.application.jobs[int(param)].cancel()
self.write('Job cancelled')
else:
# list jobs
self.write('<pre>') # this is so ugly... ;P
self.write('ID\tRUNSEC\tSTART_TIME\tSTATE\tEND_TIME\n')
for idx, job in enumerate(self.application.jobs):
self.write('%s\t%s\t%s\t%s\t%s\n' % (
idx, job.run_sec, job.start_time,
job.get_state(), job.end_time
))
class MyApplication(web.Application):
def __init__(self):
# to store tasks
self.jobs = []
super(MyApplication, self).__init__([
(r"/", MainHandler),
(r"/(add)/(\d*)", MainHandler),
(r"/(cancel)/(\d*)", MainHandler),
])
if __name__ == "__main__":
MyApplication().listen(8888)
IOLoop.current().start()
Add couple jobs:
for a in `seq 12 120`; do curl http://127.0.0.1:8888/add/$a; done
Then cancel some... Note - it requires only Tornado.
This example is very simple the gen.sleep
is meant to be your heavy task. Of course not all jobs are as simple as that to implement in cancel-able way.
Upvotes: 1