Reputation: 83
I'm trying to get multiprocessing working with some CPU bound tasks. However, I cannot figure out how to call a function in the child process and potentially pass in some argument to perform out-of-band tasks. any help is appreciated.
child.py
import multiprocessing
def Performer( multiprocessing.Process ):
def __init__( self, taks_queue ):
super().__init__()
self.taks_queue = taks_queue
self.term_str = "done"
def set_term_str( self, term_str ):
self.term_str = term_str
def run( self ):
while True:
task = taks_queue.get()
if task == self.term_str:
while taks_queue.qsize() > 0:
taks_queue.get()
else:
handle_task(task)
parent.py
import multiprocessing
def Generator( multiprocessing.Process ):
def run( self ):
taks_queues = [multiprocessing.Queue( -1 ) for i in range(5)]
for i in range(5):
perfs.append(Performer( taks_queue = taks_queue[i] ))
perfs[i].start()
while True:
message = get_message()
mod = check_message(message)
if mod != 0:
term_str = get_term_str(mod,message)
perfs[mod].set_term_str(term_str)
handle_task(task)
if __name__=="__main__":
gen = Generator()
gen.start()
gen.join()
generator communicates with outside world and needs to change term string when needed. how would I be able to call functions of another multiprocessing.Process and pass some arguments to alter execution behavior of the multiprocessing.Process?
Upvotes: 1
Views: 9933
Reputation: 49187
You have 2 main options:
use Value() to declare share memory, that is available to both parent and child. You can use ints or strings as shared values. see http://docs.python.org/library/multiprocessing.html#shared-ctypes-objects
Put the term_string into the child's task queue. when the child pops from the queue, it needs to check the value.
BTW for what you want, python already provides the great mechanism of a subprocess worker pool, see http://docs.python.org/library/multiprocessing.html#using-a-pool-of-workers
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
pool = Pool(processes=4) # start 4 worker processes
result = pool.apply_async(f, [10]) # evaluate "f(10)" asynchronously
print result.get(timeout=1) # prints "100" unless your computer is *very* slow
print pool.map(f, range(10)) # prints "[0, 1, 4,..., 81]"
Upvotes: 2