vidhan
vidhan

Reputation: 129

How to implement multithreading with tornado?

I am working with python2.7 with futures module installed.

I am trying to implement multithreading in tornado using ThreadPoolExecutor.

Here is the code that I have implemented.

from __future__ import absolute_import
from base_handler import BaseHandler
from tornado import gen
from pyrestful import mediatypes
from pyrestful.rest import get, post, put, delete
from bson.objectid import ObjectId
from spark_map import Map
from concurrent import futures
import tornado
class MapService(BaseHandler):

     MapDB = dict()
     executor = futures.ProcessPoolExecutor(max_workers=3)

     @tornado.web.asynchronous
     @gen.coroutine
     @post(_path='/map', _type=[str, str])
     def postMap(self, inp, out):
         db = self.settings['db']
         function = lambda (x,y): (x,y[0]*2)
         future = yield db.MapInfo.insert({'input': inp, 'output': out, 'input_function': str(function)})
         response = {"inserted ID": str(future)}
         self.write(response)

         m = Map(inp, out, function, appName=str(future))
         futuree = self.executor.submit(m.operation())  
         self.MapDB[str(future)] = {'map_object': m, 'running_process_future_object': futuree}
         self.finish()

     @tornado.web.asynchronous
     @gen.coroutine
     @delete(_path='/map/{_id}', _types=[str])
     def deleteMap(self, _id):
         db = self.settings['db']
         future = yield db.MapInfo.find_one({'_id': ObjectId(_id)})
         if future is None:
             raise AttributeError('No entry exists in the database with the provided ID')
         chk = yield db.MapInfo.remove(future)
         response = { "Succes": "OK" }
         self.write(response)

         self.MapDB[_id]['map_object'].stop()
         del self.MapDB[_id]
         self.finish()

In the above code, I receive two inputs using the post request in inp and out. Then I perform some operation with them. This operation should last until a delete request is received to stop and remove the process.

The problem I am facing is with the multiple requests. It only executes the first request while other requests wait for the first one to complete thus blocking the main IOLoop.

So, I want to run each process in a separate thread. How should I implement it?

Upvotes: 0

Views: 1443

Answers (1)

A. Jesse Jiryu Davis
A. Jesse Jiryu Davis

Reputation: 24017

It appears that m.operation() is blocking, so you need to run it on a thread. The way you're doing it blocks the main thread while calling m.operation(), and spawns a thread after:

self.executor.submit(m.operation())

You want, instead, to pass the function to a thread which will execute it:

self.executor.submit(m.operation)

No parens.

Upvotes: 1

Related Questions