Izumi Kawashima
Izumi Kawashima

Reputation: 1217

How to let long tasks be "cancellable" via HTTP on Tornado HTTP servers?

I've implemented an HTTP wrapper of some kind of heavy task, and I chose Tornado as a front-end server framework (it's because the heavy task is written in Python, and I'm just used to Tornado).

Currently, I just called heavy tasks directly from Tornado's process. I prepared some kind of Web-based interface using jQuery, let it proceed AJAX request with parameters set in form.

As you may imagine, the task I've thrown from my web-browser isn't cancellable. The only way I can cancel is to send 9 or 15 signal to the Python process, and that's not what users can usually do.

I want to let the currently-working-task to be cancelled by requesting some kind of "cancel" requests via HTTP. How can it be done? What are most web services that handle heavy tasks (ex. video encoding in YouTube) doing?

Upvotes: 1

Views: 429

Answers (1)

kwarunek
kwarunek

Reputation: 12587

Actually Tornado's Futures do not support cancellation (docs). Moreover even using with_timeout, the time-outed job is still running, only nothing waits for its results.

The only way, as stated also in How can I cancel a hanging asyncronous task in tornado, with a timeout?, is to implement logic in that way it could be cancelled (with some flag or whatever).

Example:

  • job is a simple async sleep
  • / lists jobs
  • /add/TIME adds new job - TIME in seconds - specify how long to sleep
  • /cancel/ID cancel job

The code might look like:

from tornado.ioloop import IOLoop
from tornado import gen, web
from time import time

class Job():

    def __init__(self, run_sec):
        self.run_sec = int(run_sec)
        self.start_time = None
        self.end_time = None
        self._cancelled = False

    @gen.coroutine
    def run(self):
        """ Some job

        The job is simple: sleep for a given number of seconds.
        It could be implemented as:
             yield gen.sleep(self.run_sec)
        but this way makes it not cancellable, so
        it is divided: run 1s sleep, run_sec times 
        """
        self.start_time = time()
        deadline = self.start_time + self.run_sec
        while not self._cancelled:
            yield gen.sleep(1)
            if time() >= deadline:
                break
        self.end_time = time()

    def cancel(self):
    """ Cancels job

    Returns None on success,
    raises Exception on error:
      if job is already cancelled or done
    """
        if self._cancelled:
            raise Exception('Job is already cancelled')
        if self.end_time is not None:
            raise Exception('Job is already done')
        self._cancelled = True

    def get_state(self):
        if self._cancelled:
            if self.end_time is None:
                # job might be running still
                # and will be stopped on the next while check
                return 'CANCELING...'
            else:
                return 'CANCELLED'
        elif self.end_time is None:
            return 'RUNNING...'
        elif self.start_time is None:
            # actually this never will shown
            # as after creation, job is immediately started
            return 'NOT STARTED'
        else:
            return 'DONE'


class MainHandler(web.RequestHandler):

    def get(self, op=None, param=None):
        if op == 'add':
            # add new job
            new_job = Job(run_sec=param)
            self.application.jobs.append(new_job)
            new_job.run()
            self.write('Job added')
        elif op == 'cancel':
            # cancel job - stop running
            self.application.jobs[int(param)].cancel()
            self.write('Job cancelled')
        else:
            # list jobs
            self.write('<pre>') # this is so ugly... ;P
            self.write('ID\tRUNSEC\tSTART_TIME\tSTATE\tEND_TIME\n')
            for idx, job in enumerate(self.application.jobs):
                self.write('%s\t%s\t%s\t%s\t%s\n' % (
                    idx, job.run_sec, job.start_time,
                    job.get_state(), job.end_time
                ))


class MyApplication(web.Application):

    def __init__(self):

        # to store tasks
        self.jobs = []

        super(MyApplication, self).__init__([
            (r"/", MainHandler),
            (r"/(add)/(\d*)", MainHandler),
            (r"/(cancel)/(\d*)", MainHandler),
        ])

if __name__ == "__main__":
    MyApplication().listen(8888)
    IOLoop.current().start()

Add couple jobs:

for a in `seq 12 120`; do curl http://127.0.0.1:8888/add/$a; done

Then cancel some... Note - it requires only Tornado.

This example is very simple the gen.sleep is meant to be your heavy task. Of course not all jobs are as simple as that to implement in cancel-able way.

Upvotes: 1

Related Questions