yanachen
yanachen

Reputation: 3753

Is it right to init multiprocess in class __init__?

from multiprocessing.dummy import Pool as ThreadPool
class TSNew:
    def __init__(self):
        self.redis_client = redis.StrictRedis(host="172.17.31.147", port=4401, db=0)
        self.global_switch = 0
        self.pool = ThreadPool(40) # init pool
        self.dnn_model = None
        self.nnf = None
        self.md5sum_nnf = "initialize"
        self.thread = threading.Thread(target=self.load_model_item)
        self.ts_picked_ids = None
        self.thread.start()

        self.memory = deque(maxlen=3000)
        self.process = threading.Thread(target=self.process_user_dict)
        self.process.start()

    def load_model_item(self):
        '''
        code
        '''
    def predict_memcache(self,user_dict):
        '''
        code
        '''
    def process_user_dict(self):
        while True:
            '''
            code to generate user_dicts which is a list 
            '''
            results = self.pool.map(self.predict_memcache, user_dicts)
            '''
            code
            '''
TSNew_ = TSNew()

def get_user_result():
    logging.info("----------------come in ------------------")
    if request.method == 'POST':
        user_dict_json = request.get_data()# userid
        if user_dict_json == '' or user_dict_json is None:
            logging.info("----------------user_dict_json is ''------------------")
            return ''
        try:
            user_dict = json.loads(user_dict_json)
        except:
            logging.info("json load error, pass")
            return ''
        TSNew_.memory.append(user_dict)
        logging.info('add to deque TSNew_.memory size: %d  PID: %d', len(TSNew_.memory), os.getpid())
        logging.info("add to deque userid: %s, nation: %s \n",user_dict['user_id'],  user_dict['user_country'])
        return 'SUCCESS\n'


@app.route('/', methods=['POST'])
def get_ts_gbdt_id():
    return get_user_result()

from werkzeug.contrib.fixers import ProxyFix
app.wsgi_app = ProxyFix(app.wsgi_app)

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=4444)

I create a multi thread pool in class __init__ and I use the self.pool to map the function of predict_memcache. I have two doubts: (a) Should I initialize the pool in __init__ or just init it right before

results = self.pool.map(self.predict_memcache, user_dicts)

(b) Since the pool is a multi thread operation and it is executed in the thread of process_user_dict, so is there any hidden error ? Thanks.

Upvotes: 1

Views: 1042

Answers (1)

shmee
shmee

Reputation: 5101

Question (a):

It depends. If you need to run process_user_dict more than once, then it makes sense to start the pool in the constructor and keep it running. Creating a thread pool always comes with some overhead and by keeping the pool alive between calls to process_user_dict you would avoid that additional overhead.

If you just want to process one set of input, you can as well create your pool right inside process_user_dict. But probably not right before results = self.pool.map(self.predict_memcache, user_dicts) because that would create a pool for every iteration of your surrounding while loop.

In your specific case, it does not make any difference. You create your TSNew_ object on module-level, so that it remains alive (and with it the thread pool) while your app is running; the same thread pool from the same TSNew instance is used to process all the requests during the lifetime of app.run(). Since you seem to be using that construct with self.process = threading.Thread(target=self.process_user_dict) as some sort of listener on self.memory, creating the pool in the constructor is functionally equivalent to creating the pool inside of process_user_dict (but outside the loop).

Question (b):

Technically, there is no hidden error by default when creating a thread inside a thread. In the end, any additional thread's ultimate parent is always the MainThread, that is implicitly created for every instance of a Python interpreter. Basically, every time you create a thread inside a Python program, you create a thread in a thread.

Actually, your code does not even create a thread inside a thread. Your self.pool is created inside the MainThread. When the pool is instantiated via self.pool = ThreadPool(40) it creates the desired number (40) of worker threads, plus one worker handler thread, one task handler thread and one result handler thread. All of these are child threads of the MainThread. All you do with regards to your pool inside your thread under self.process is calling its map method to assign tasks to it.

However, I do not really see the point of what you are doing with that self.process here. Making a guess, I would say that you want to start the loop in process_user_dict to act as kind of a listener on self.memory, so that the pool starts processing user_dict as soon as they start showing up in the deque in self.memory. From what I see you doing in get_user_result, you seem to get one user_dict per request. I understand that you might have concurrent user sessions passing in these dicts, but do you really see benfit from process_user_dict running in an infinite loop over simply calling TSNew_.process_user_dict() after TSNew_.memory.append(user_dict)? You could even omit self.memory completely and pass the dict directly to process_user_dict, unless I am missing something you did not show us.

Upvotes: 2

Related Questions