Reputation: 1645
I have a tornado webservice which is going to serve something around 500 requests per minute. All these requests are going to hit 1 specific endpoint. There is a C++
program that I have compiled using Cython
and use it inside the tornado service as my processor engine. Each request that goes to /check/
will trigger a function call in the C++
program (I will call it handler
) and the return value will get sent to user as response.
This is how I wrap the handler
class. One important point is that I do not instantiate the handler
in __init__
. There is another route in my tornado code that I want to start loading the DataStructure after an authroized request hits that route. (e.g. /reload/
)
executors = ThreadPoolExecutor(max_workers=4)
class CheckerInstance(object):
def __init__(self, *args, **kwargs):
self.handler = None
self.is_loading = False
self.is_live = False
def init(self):
if not self.handler:
self.handler = pDataStructureHandler()
self.handler.add_words_from_file(self.data_file_name)
self.end_loading()
self.go_live()
def renew(self):
self.handler = None
self.init()
class CheckHandler(tornado.web.RequestHandler):
async def get(self):
query = self.get_argument("q", None).encode('utf-8')
answer = query
if not checker_instance.is_live:
self.write(dict(answer=self.get_argument("q", None), confidence=100))
return
checker_response = await checker_instance.get_response(query)
answer = checker_response[0]
confidence = checker_response[1]
if self.request.connection.stream.closed():
return
self.write(dict(correct=answer, confidence=confidence, is_cache=is_cache))
def on_connection_close(self):
self.wait_future.cancel()
class InstanceReloadHandler(BasicAuthMixin, tornado.web.RequestHandler):
def prepare(self):
self.get_authenticated_user(check_credentials_func=credentials.get, realm='Protected')
def new_file_exists(self):
return True
def can_reload(self):
return not checker_instance.is_loading
def get(self):
error = False
message = None
if not self.can_reload():
error = True
message = 'another job is being processed!'
else:
if not self.new_file_exists():
error = True
message = 'no new file found!'
else:
checker_instance.go_fake()
checker_instance.start_loading()
tornado.ioloop.IOLoop.current().run_in_executor(executors, checker_instance.renew)
message = 'job started!'
if self.request.connection.stream.closed():
return
self.write(dict(
success=not error, message=message
))
def on_connection_close(self):
self.wait_future.cancel()
def main():
app = tornado.web.Application(
[
(r"/", MainHandler),
(r"/check", CheckHandler),
(r"/reload", InstanceReloadHandler),
(r"/health", HealthHandler),
(r"/log-event", SubmitLogHandler),
],
debug=options.debug,
)
checker_instance = CheckerInstance()
I want this service to keep responding after checker_instance.renew
starts running in another thread. But this is not what happens. When I hit the /reload/
endpoint and renew
function starts working, any request to /check/
halts and waits for the reloading process to finish and then it starts working again. When the DataStructure is being loaded, the service should be in fake
mode and respond to people with the same query that they send as input.
I have tested this code in my development environment with an i5 CPU (4 CPU cores) and it works just fine! But in the production environment (3 double-thread CPU cores) the /check/
endpoint halts requests.
Upvotes: 2
Views: 176
Reputation: 608
It is difficult to fully trace the events being handled because you have clipped out some of the code for brevity. For instance, I don't see a get_response
implementation here so I don't know if it is awaiting something itself that could be dependent on the state of checker_instance.
One area I would explore is in the thread-safety (or seeming absence of) in passing the checker_instance.renew
to run_in_executor
. This feels questionable to me because you are mutating the state of a single instance of CheckerInstance from a separate thread. While it might not break things explicitly, it does seem like this could be introducing odd race conditions or unanticipated copies of memory that might explain the unexpected behavior you are experiencing
If possible, I would make whatever load behavior you have that you want to offload to a thread be completely self-contained and when the data is loaded, return it as the function result which can then be fed back into you checker_instance. If you were to do this with the code as-is, you would want to await the run_in_executor
call for its result and then update the checker_instance. This would mean the reload GET request would wait until the data was loaded. Alternatively, in your reload GET request, you could ioloop.spawn_callback
to a function that triggers the run_in_executor
in this manner, allowing the reload request to complete instead of waiting.
Upvotes: 2