Reputation: 47
I feel like I am far from good practice at this point. I am using tornado framework while also leveraging the ProcessPoolExecutor
import tornado.ioloop
import datetime
import tornado.web
import azure.functions as func
import json
import logging
import requests
import sys
import os
import pymongo
import mongo_config
import re
import concurrent.futures
from azure_model import Pytorch_Azure
MONGO_URL = mongo_config.uri()
mongo_client = pymongo.MongoClient(MONGO_URL)
db = mongo_client['db']
prods = mongo_client['db']['products']
pta = Pytorch_Azure()
def parallel_pred(img):
r = requests.get(img, timeout = 10)
img_id = img.split('/')[-1].split('.')[0]
img_name = 'tmp{}.png'.format(img_id)
with open(img_name, 'wb') as f:
f.write(r.content)
prediction = pta.predict(img_name)
os.remove(img_name)
return prediction
class Predictionator(tornado.web.RequestHandler):
def data_received(self, chunk):
pass
def get(self):
merchant_id = self.get_argument('id', None, True)
prod_type = self.request.uri.split('category=')[1].split('&id=')[0].replace('%20', ' ').replace('%26', '&').replace('%27', '\'')
pred_list = []
outputs = {}
print(type(prod_type))
if merchant_id and prod_type:
counter = 0
try:
print(prod_type)
for i in prods.find({'merchant': int(merchant_id), 'details.product_type':re.compile('^' + prod_type + '$', re.IGNORECASE)}):
prod_img = i['merchantImages'][0]
if prod_img not in pred_list:
pred_list.append(prod_img)
counter += 1
if counter == 5:
break
except:
self.write({'body': 'There was an error with the query. Please ensure you are using a correct merchant id and product type'})
print(pred_list)
if pred_list:
try:
executor = concurrent.futures.ProcessPoolExecutor(4)
for pred_out in executor.map(parallel_pred, pred_list, timeout = 15):
if pred_out['label'] not in outputs.keys():
outputs[pred_out['label']] = 1
else:
outputs[pred_out['label']] += 1
except:
self.write({'body': 'There was an issue making the predictions.'})
if outputs:
prediction = {}
prediction['label'] = max(outputs, key = outputs.get)
prediction['object_id'] = db.categories.find_one({'name':prediction['label']})['_id']
print(outputs)
self.write(json.dumps(prediction))
else:
self.write({'statusCode': 400, 'body':'An error occurred.'})
else:
self.write({'statusCode': 400, 'body':'There were no results returned. Please ensure the id parameter has a valid merchant id and the category id has a valid product type'})
else:
self.write({'statusCode': 400, 'body':'Please pass a name on the query string or in the request body'})
def make_app():
return tornado.web.Application([
(r'/categorize',Predictionator),
])
def start_nado():
print('starting nado')
app = make_app()
server = app.listen(8888)
return server
def restart():
python = sys.executable
os.execl(python, python, * sys.argv)
def stop_nado():
ioloop = tornado.ioloop.IOLoop.instance()
ioloop.add_callback(ioloop.stop)
ioloop.add_callback(ioloop.close)
print('stopping nado')
def main():
while True:
try:
try:
server = start_nado()
tornado.ioloop.IOLoop.current().add_timeout(datetime.timedelta(seconds=600), stop_nado)
tornado.ioloop.IOLoop.current().start()
except OSError:
print('restarting')
restart()
except KeyboardInterrupt:
tornado.ioloop.IOLoop.instance().stop()
break
if __name__ == "__main__":
try:
main()
except OSError:
tornado.ioloop.IOLoop.instance.stop()
main()
The main issue comes with the Predictionator class. The idea is that it takes 5 products from a database and makes a prediction on each and return which class had the most predictions. That works fine, but it took a while, so we wanted to parallelize it with processes.
The first problem came with a hangup where it would make predictions on two and then become completely unresponsive. This is when the tornado.ioloop.IOLoop.current().add_timeout(datetime.timedelta(seconds=600), stop_nado)
became the solution, to essentially restart the tornado server every 10 minutes.
After this, there was an OSError: [Errno 24] Too many open files
error. This is when the restart
function became the [hacky] solution, which would pretty much just restart the program.
This entire process worked pretty well for about 2 days and after this, the server it was running on became completely unresponsive. At this point, I am just looking for a point in the correct direction, I doubt tornado is the problem, but should I use a different framework altogether? I am pretty new to tornado and parallel processes with python.
Thank you
Upvotes: 1
Views: 603
Reputation: 21789
Because you're creating 4 new processes every time if pred_list
condition is satisfied.
Generally, in Tornado programs you create a global executor
object and reuse it.
# create a global object
executor = concurrent.futures.ProcessPoolExecutor(4)
class Predictionator(...):
...
def get():
...
# use the global `executor` object instead of creating a new one
for pred_out in executor.map(...)
Another approach is to create executor
in a with...as
statement and these processes will be automatically closed and cleaned up when their tasks are done (please see the note at the end of the answer before using this method).
def get():
...
with concurrent.futures.ProcessPoolExecutor(4) as executor:
for pred_out in executor.map(...)
Note: First approach will give you better performance. In the second approach there's the overhead involved of creating and closing processes.
Upvotes: 1