Reputation: 41
I'm familiar with event-driven programming but I have encountered this problem and I have terminated possible solutions. I read the documentation of Tornado, I tried with:
but I was not able to solve the following problem:
I have a websocket server that just listen for new messages and based on the message type it calls a specific function
class WebSocketHandler(tornado.websocket.WebSocketHandler):
...
def on_message(self, message):
if message['type'] is X:
self.write(functionA(message['data']))
elif message['type'] is Y:
self.write(functionB(message['data']))
...
The problem comes when one computationally expensive function is execute, let say functionA, it can take up to 5 minutes to terminate
def functionA(message):
params = extract_params(message)
cmd = "computationally_expensive_tool"
out = check_output(cmd, shell=True, stderr=STDOUT, cwd=working_dir)
...
return json.dumps({
"error": False,
"message": "computationally_expensive_tool_execution_terminated",
"type": X
})
My question is how can I execute that function in an asynchronous way so that I can still handle other messages and the result of functionA when it will be ready?
Upvotes: 0
Views: 201
Reputation: 22154
If functionA
is a blocking function that cannot be made asynchronous, you probably want to run it in a thread pool:
executor = concurrent.futures.ThreadPoolExecutor()
@gen.coroutine
def on_message(self, message):
if message['type'] is X:
yield executor.submit(functionA, message['data'])
elif message['type'] is Y:
functionB(message['data'])
This will block this websocket until functionA
finishes, but allow other connections to continue working. If you need to continue processing other kinds of messages from the same connection while functionA
runs, you need a more complicated arrangement, possibly involving a tornado.queues.Queue
.
Upvotes: 1