Hossein
Hossein

Reputation: 26014

Why doesn't pool.map initiate a new process?

I'm trying to run a member method in a separate subprocess. The member method is actually acting as a callback dispatcher loop where it executes each callback.

The issue I'm facing is that the callbacks never get called. The method which is responsible for spawning the new process does get executed but I get no output/signal from the actual dispatcher loop or the callback itself.

The new process is spawned using this snippet :

def run_callback_disptacher(self):
    pool = pathos.multiprocessing.Pool(1)
    pool.map(self.execute_callbacks, [])
    print('run_callback_disptacher executed')

and the dispatcher callback is this:

def execute_callbacks(self):
    print(f'dispatcher list: {self.callback_list}')
    while True:
        for callback in self.callback_list:
            callback()

I am using pathos.multiprocessing module in Windows10 and this is a minimal example demonstrating the issue:

import time
import torch 
import torchvision
from torchvision import models
from torch.utils import mkldnn as mkldnn_utils
import pathos

class SomeClass():
    def __init__(self, model_name='r18', use_jit=False, use_mkldnn=False, device='cpu'):
        self.model_name = model_name
        self.use_jit = use_jit
        self.use_mkldnn = use_mkldnn 
        self.device = device
        self.callback_list = []
        self.is_running = False
        self._init_model()

    def _init_model(self):
        if self.model_name == 'r18':
            self.model = models.resnet18(pretrained=True)
        elif self.model_name == 'r50':
            self.model = models.resnet50(pretrained=True)
        else:
            raise Exception(f"Model name: '{self.model_name}' is not recognized.")

        self.model = self.model.to(self.device)
        self.model.eval()

        if self.use_mkldnn:
            self.model = mkldnn_utils.to_mkldnn(self.model)
        if self.use_jit: 
            self.model = self.load_jit_model()

    def load_jit_model(self, jit_path = 'model.jit'):
        dummy_input = torch.tensor(torch.rand(size=(1, 3, 224, 224)))
        model = torch.jit.trace(self.model, dummy_input)
        torch.jit.save(model, jit_path)
        return torch.jit.load(jit_path)

    def add_callback(self, callback):
        self.callback_list.append(callback)

    def remove_callback(self, callback):
        self.callback_list.remove(callback)

    def get_callbacks(self):
        return self.callback_list

    def execute_callbacks(self):
        print(f'dispatcher list: {self.callback_list}')
        while True:
            for callback in self.callback_list:
                callback()

    def start(self):
        self.is_running = True
        while self.is_running:
            # simulating a generic operation here
            time.sleep(0.2)
        print('start ended!')

    def stop(self):
        self.is_running = False

    def run_callback_disptacher(self):
        pool = pathos.multiprocessing.Pool(1)
        pool.map(self.execute_callbacks, [])
        print('run_callback_disptacher executed')

and this is how it is called :

import threading
import time
from minimal_example import SomeClass

def simple_callback():
    print('hello from simple callback')

def start():
    obj = SomeClass(model_name='r18', use_jit=False, use_mkldnn=False, device='cpu')
    obj.add_callback(simple_callback)
    obj.run_callback_disptacher()
    starter = threading.Thread(target=obj.start)
    starter.start()
    time.sleep(5)
    print(obj.get_callbacks())
    obj.stop()
    print('Done!')

if __name__ == '__main__':
    start()

What am I missing here?

Upvotes: 0

Views: 77

Answers (0)

Related Questions