Reputation: 26014
I'm trying to run a member method in a separate subprocess. The member method is actually acting as a callback dispatcher loop where it executes each callback.
The issue I'm facing is that the callbacks never get called. The method which is responsible for spawning the new process does get executed but I get no output/signal from the actual dispatcher loop or the callback itself.
The new process is spawned using this snippet :
def run_callback_disptacher(self):
pool = pathos.multiprocessing.Pool(1)
pool.map(self.execute_callbacks, [])
print('run_callback_disptacher executed')
and the dispatcher callback is this:
def execute_callbacks(self):
print(f'dispatcher list: {self.callback_list}')
while True:
for callback in self.callback_list:
callback()
I am using pathos.multiprocessing
module in Windows10 and this is a minimal example demonstrating the issue:
import time
import torch
import torchvision
from torchvision import models
from torch.utils import mkldnn as mkldnn_utils
import pathos
class SomeClass():
def __init__(self, model_name='r18', use_jit=False, use_mkldnn=False, device='cpu'):
self.model_name = model_name
self.use_jit = use_jit
self.use_mkldnn = use_mkldnn
self.device = device
self.callback_list = []
self.is_running = False
self._init_model()
def _init_model(self):
if self.model_name == 'r18':
self.model = models.resnet18(pretrained=True)
elif self.model_name == 'r50':
self.model = models.resnet50(pretrained=True)
else:
raise Exception(f"Model name: '{self.model_name}' is not recognized.")
self.model = self.model.to(self.device)
self.model.eval()
if self.use_mkldnn:
self.model = mkldnn_utils.to_mkldnn(self.model)
if self.use_jit:
self.model = self.load_jit_model()
def load_jit_model(self, jit_path = 'model.jit'):
dummy_input = torch.tensor(torch.rand(size=(1, 3, 224, 224)))
model = torch.jit.trace(self.model, dummy_input)
torch.jit.save(model, jit_path)
return torch.jit.load(jit_path)
def add_callback(self, callback):
self.callback_list.append(callback)
def remove_callback(self, callback):
self.callback_list.remove(callback)
def get_callbacks(self):
return self.callback_list
def execute_callbacks(self):
print(f'dispatcher list: {self.callback_list}')
while True:
for callback in self.callback_list:
callback()
def start(self):
self.is_running = True
while self.is_running:
# simulating a generic operation here
time.sleep(0.2)
print('start ended!')
def stop(self):
self.is_running = False
def run_callback_disptacher(self):
pool = pathos.multiprocessing.Pool(1)
pool.map(self.execute_callbacks, [])
print('run_callback_disptacher executed')
and this is how it is called :
import threading
import time
from minimal_example import SomeClass
def simple_callback():
print('hello from simple callback')
def start():
obj = SomeClass(model_name='r18', use_jit=False, use_mkldnn=False, device='cpu')
obj.add_callback(simple_callback)
obj.run_callback_disptacher()
starter = threading.Thread(target=obj.start)
starter.start()
time.sleep(5)
print(obj.get_callbacks())
obj.stop()
print('Done!')
if __name__ == '__main__':
start()
What am I missing here?
Upvotes: 0
Views: 77