adam
adam

Reputation: 47

Async process pool executor not working in tornado

I feel like I am far from good practice at this point. I am using tornado framework while also leveraging the ProcessPoolExecutor

import tornado.ioloop
import datetime
import tornado.web
import azure.functions as func
import json
import logging
import requests
import sys
import os
import pymongo
import mongo_config
import re
import concurrent.futures
from azure_model import Pytorch_Azure

MONGO_URL = mongo_config.uri()
mongo_client = pymongo.MongoClient(MONGO_URL)
db = mongo_client['db']
prods = mongo_client['db']['products']

pta = Pytorch_Azure()

def parallel_pred(img):
    r = requests.get(img, timeout = 10)
    img_id = img.split('/')[-1].split('.')[0]
    img_name = 'tmp{}.png'.format(img_id)
    with open(img_name, 'wb') as f:
        f.write(r.content)
    prediction = pta.predict(img_name)
    os.remove(img_name)
    return prediction

class Predictionator(tornado.web.RequestHandler):
    def data_received(self, chunk):
        pass

    def get(self):
        merchant_id = self.get_argument('id', None, True)
        prod_type = self.request.uri.split('category=')[1].split('&id=')[0].replace('%20', ' ').replace('%26', '&').replace('%27', '\'')
        pred_list = []
        outputs = {}
        print(type(prod_type))
        if merchant_id and prod_type:

            counter = 0
            try:
                print(prod_type)
                for i in prods.find({'merchant': int(merchant_id), 'details.product_type':re.compile('^' + prod_type + '$', re.IGNORECASE)}):


                    prod_img = i['merchantImages'][0]
                    if prod_img not in pred_list:
                        pred_list.append(prod_img)
                        counter += 1
                        if counter == 5:
                            break
            except:
                self.write({'body': 'There was an error with the query. Please ensure you are using a correct merchant id and product type'})
            print(pred_list)

            if pred_list:

                try:   

                    executor = concurrent.futures.ProcessPoolExecutor(4)
                    for pred_out in executor.map(parallel_pred, pred_list, timeout = 15):
                            if pred_out['label'] not in outputs.keys():
                                outputs[pred_out['label']] = 1
                            else:
                                outputs[pred_out['label']] += 1


                except:
                    self.write({'body': 'There was an issue making the predictions.'})


                if outputs:
                    prediction = {}
                    prediction['label'] = max(outputs, key = outputs.get)
                    prediction['object_id'] = db.categories.find_one({'name':prediction['label']})['_id']

                    print(outputs)
                    self.write(json.dumps(prediction))
                else:
                    self.write({'statusCode': 400, 'body':'An error occurred.'})
            else:
                self.write({'statusCode': 400, 'body':'There were no results returned. Please ensure the id parameter has a valid merchant id and the category id has a valid product type'})
        else:
            self.write({'statusCode': 400, 'body':'Please pass a name on the query string or in the request body'})




def make_app():
    return tornado.web.Application([
        (r'/categorize',Predictionator),
    ])

def start_nado():
    print('starting nado')
    app = make_app()
    server = app.listen(8888)
    return server

def restart():
    python = sys.executable
    os.execl(python, python, * sys.argv)

def stop_nado():
    ioloop = tornado.ioloop.IOLoop.instance()
    ioloop.add_callback(ioloop.stop)
    ioloop.add_callback(ioloop.close)
    print('stopping nado')

def main():
    while True:
        try:
            try:
                server = start_nado()
                tornado.ioloop.IOLoop.current().add_timeout(datetime.timedelta(seconds=600), stop_nado)
                tornado.ioloop.IOLoop.current().start()
            except OSError:
                print('restarting')
                restart()
        except KeyboardInterrupt:
            tornado.ioloop.IOLoop.instance().stop()
            break


if __name__ == "__main__":
    try:
        main()
    except OSError:
        tornado.ioloop.IOLoop.instance.stop()
        main()

The main issue comes with the Predictionator class. The idea is that it takes 5 products from a database and makes a prediction on each and return which class had the most predictions. That works fine, but it took a while, so we wanted to parallelize it with processes. The first problem came with a hangup where it would make predictions on two and then become completely unresponsive. This is when the tornado.ioloop.IOLoop.current().add_timeout(datetime.timedelta(seconds=600), stop_nado) became the solution, to essentially restart the tornado server every 10 minutes. After this, there was an OSError: [Errno 24] Too many open files error. This is when the restart function became the [hacky] solution, which would pretty much just restart the program. This entire process worked pretty well for about 2 days and after this, the server it was running on became completely unresponsive. At this point, I am just looking for a point in the correct direction, I doubt tornado is the problem, but should I use a different framework altogether? I am pretty new to tornado and parallel processes with python. Thank you

Upvotes: 1

Views: 603

Answers (1)

xyres
xyres

Reputation: 21789

Because you're creating 4 new processes every time if pred_list condition is satisfied.

Generally, in Tornado programs you create a global executor object and reuse it.

# create a global object
executor = concurrent.futures.ProcessPoolExecutor(4)

class Predictionator(...):
    ...
    def get():
        ...
        # use the global `executor` object instead of creating a new one
        for pred_out in executor.map(...)

Another approach is to create executor in a with...as statement and these processes will be automatically closed and cleaned up when their tasks are done (please see the note at the end of the answer before using this method).

def get():
    ...
    with concurrent.futures.ProcessPoolExecutor(4) as executor:
        for pred_out in executor.map(...)

Note: First approach will give you better performance. In the second approach there's the overhead involved of creating and closing processes.

Upvotes: 1

Related Questions