Reputation: 1285
I am trying to develop a wrapper for dynamically using the multiprocessing
module. I have a number of functions in various modules that need to be properly managed. I need to be able to pass a function, originating from any module, and its parameters to my wrapper. Both the data and the function will not be known until run-time, as it is user-dependent.
Here is an example of what I am trying to do:
import sys
from multiprocessing import Process, Queue, cpu_count
def dyn():
pass
class mp():
def __init__(self, data, func, n_procs = cpu_count()):
self.data = data
self.q = Queue()
self.n_procs = n_procs
# Replace module-level 'dyn' function with the provided function
setattr(sys.modules[__name__], 'dyn', func)
# Calling dyn(...) at this point will produce the same output as
# calling func(...)
def _worker(self, *items):
data = []
for item in items:
data.append(dyn(item))
self.q.put(data)
def compute(self):
for item in self.data:
Process(target=getattr(self, '_worker'), args=item).start()
def items(self):
queue_count = self.n_procs
while queue_count > 0:
queue_count -= 1
yield self.q.get()
if __name__ == '__main__':
def work(x):
return x ** 2
# Create workers
data = [range(10)] * cpu_count()
workers = mp(data, work)
# Make the workers work
workers.compute()
# Get the data from the workers
workers_data = []
for item in workers.items():
workers_data.append(item)
print workers_data
For this example, the output should be in this format:
[[0, 1, 4, 9, 16, 25, 36, 49, 64, 81] * n_procs]
If you try running this code you will get an exception stating that too many arguments were passed to dyn
. I believe the issue is that dyn
is overwritten for this instance, but when Process
is called, the changes no longer exist.
How can I get around this issue?
Note - This code needs to run on Windows. I am using Python 2.7.
UPDATE
Based off the comments I was getting, I decided to do something "messy". Below is my working solution:
import sys, re, uuid, os
from cStringIO import StringIO
from multiprocessing import Process, Queue, cpu_count
class mp():
def __init__(self, data, func, n_procs = cpu_count()):
self.data = data
self.q = Queue()
self.n_procs = n_procs
self.module = 'm' + str(uuid.uuid1()).replace('-', '')
self.file = self.module + '.py'
# Build external module
self.__func_to_module(func)
def __func_to_module(self, func):
with open(self.file, 'wb') as f:
for line in StringIO(func):
if 'def' in line:
f.write(re.sub(r'def .*\(', 'def work(', line))
else:
f.write(line)
def _worker(self, q, module, *items):
exec('from %s import work' % module)
data = []
for item in items[0]:
data.append(work(item))
q.put(data)
def compute(self):
for item in self.data:
Process(target=getattr(self, '_worker'),
args=(self.q, self.module, item)).start()
def items(self):
queue_count = self.n_procs
while queue_count > 0:
queue_count -= 1
yield self.q.get()
os.remove(self.file)
os.remove(self.file + 'c')
if __name__ == '__main__':
func = '''def func(x):
return x ** 2'''
# Create workers
data = [range(10)] * cpu_count()
workers = mp(data, func)
# Make the workers work
workers.compute()
# Get the data from the workers
workers_data = []
for item in workers.items():
workers_data.append(item)
print workers_data
Upvotes: 2
Views: 2305
Reputation: 28684
On windows, the modules are re-loaded when each new process is started, so the definition of dyn is lost. You can, however, pass a function through the queue, or through arguments to the target function of the process.
def _worker(*items, func=None, q=None):
#note that this had better be a function not a method
data = []
for item in items:
data.append(func(item))
q.put(data)
#...
Process(target=_worker, args=item, kwargs={'func':dyn, 'q':q})
Upvotes: 1