Reputation: 1081
I'm trying to use multiprocessing for doing multiple background jobs and using the main process as user interface which accepts commands through input()
. Each process has to do certain jobs and will write its current status into a dictionary, which was created with manager.dict()
and then passed to the Process.
After the creation of the processes, there is a loop with an input()
for accessing the user commands. The commands are reduced to a minimum for simplicity here.
from multiprocessing import Manager
from multiprocessing import Process
with Manager() as manager:
producers = []
settings = [{'name':'test'}]
for setting in settings:
status = manager.dict()
logger.info("Start Producer {0}".format(setting['name']))
producer = Process(target=start_producer, args=(setting, status))
producer.start()
producers.append([producer, status])
logger.info("initialized {0} producers".format(len(producers)))
while True:
text_command = input('Enter your command:')
if text_command == 'exit':
logger.info("waiting for producers")
for p in producers:
p[0].join()
logger.info("Exit application.")
break
elif text_command == 'status':
for p in producers:
if 'name' in p[1] and 'status' in p[1]:
print('{0}:{1}'.format(p[1]['name'], p[1]['status']))
else:
print("Unknown command.")
The method which runs in other processes is pretty simple:
def start_producer(producer_setting: dict, status_dict: dict):
importer = MyProducer(producer_setting)
importer.set_status_dict(status_dict)
importer.run()
I create a MyProducer instance and set the status-dictionary through a setter of the object and call the blocking run()
method, which will only return when the producer is finished. On calling set_status_dict(status_dict)
, the dictionary is filled with a name
and status
element.
When I run the code, the producer seems to get created, I receive the "Start Producer test" and "initialized 1 producers" output and after that the "Enter your command" request from the input()
, but it seems that the actual process doesn't run.
When I press enter to skip the first loop iteration, I get the expected "unknown command" log and the producer-process begins the actual work. After that my "status" command also works as expected.
When I enter 'status' in the first loop-iteration I get an key-Error, because 'name' and 'status' are not set in the dictionary. Those keys should get set in set_status_dict()
which itself is called in Process(target=...)
.
Why is that? Shouldn't producer.start() run the complete block of start_producer
inside a new process and therefor never hang on the input()
of the main-process?
How can I start the processes first without any user input and only then wait for input()
?
Edit: A complete mvce programm with this problem can be found here: https://pastebin.com/k8xvhLhn
Edit: A solution with sleep(1)
after initializing the processes has been found. But why does that behavior happen in the first place? Shouldn't run all code in start_producer()
run in a new process?
Upvotes: 0
Views: 366
Reputation: 23753
I have limited experience with the multiprocessing module but I was able to get it to behave the way (i think) you want. First I added some print statements at the top of the while loop to see what might be going on and found that if the process was run
or join
ed it worked. I figured you didn't want it to block so I added the call to run further up the process - but it appears that run()
also blocks. Turns out that the process just wasn't finished when the first while loop iteration came around - adding time.sleep(30)
at the top of the loop gave the process enough time to get scheduled (by the OS) and run. (On my machine it actually only needs between 200 and 300 milliseconds of nap time)
I replaced start_producer
with :
def start_producer(producer_setting: dict, status_dict: dict):
## importer = MyProducer(producer_setting)
## importer.set_status_dict(status_dict)
## importer.run()
#time.sleep(30)
status_dict['name'] = 'foo'
status_dict['status'] = 'thinking'
Your code modified:
if __name__ == '__main__':
with Manager() as manager:
producers = []
settings = [{'name':'test'}]
for setting in settings:
status = manager.dict()
logger.info("Start Producer {0}".format(setting['name']))
producer = Process(target=start_producer, args=(setting, status))
producer.start()
# add a call to run() but it blocks
#producer.run()
producers.append([producer, status])
logger.info("initialized {0} producers".format(len(producers)))
while True:
time.sleep(30)
for p, s in producers:
#p.join()
#p.run()
print(f'name:{p.name}|alive:{p.is_alive()}|{s}')
if 'name' in s and 'status' in s:
print('{0}:{1}'.format(s['name'], s['status']))
text_command = input('Enter your command:')
if text_command == 'exit':
logger.info("waiting for producers")
for p in producers:
p[0].join()
logger.info("Exit application.")
break
elif text_command == 'status':
for p in producers:
if 'name' in p[1] and 'status' in p[1]:
print('{0}:{1}'.format(p[1]['name'], p[1]['status']))
else:
print("Unknown command.")
Upvotes: 1