James Mnatzaganian
James Mnatzaganian

Reputation: 1285

Python - Dynamic usage of multiprocessing module

I am trying to develop a wrapper for dynamically using the multiprocessing module. I have a number of functions in various modules that need to be properly managed. I need to be able to pass a function, originating from any module, and its parameters to my wrapper. Both the data and the function will not be known until run-time, as it is user-dependent.

Here is an example of what I am trying to do:

import sys
from multiprocessing import Process, Queue, cpu_count

def dyn():
    pass

class mp():
    def __init__(self, data, func, n_procs = cpu_count()):
        self.data    = data
        self.q       = Queue()
        self.n_procs = n_procs

        # Replace module-level 'dyn' function with the provided function
        setattr(sys.modules[__name__], 'dyn', func)
        # Calling dyn(...) at this point will produce the same output as
        # calling func(...)

    def _worker(self, *items):
        data = []
        for item in items:
            data.append(dyn(item))
        self.q.put(data)

    def compute(self):
        for item in self.data:
            Process(target=getattr(self, '_worker'), args=item).start()

    def items(self):
        queue_count = self.n_procs
        while queue_count > 0:
            queue_count -= 1
            yield self.q.get()

if __name__ == '__main__':  
    def work(x):
        return x ** 2

    # Create workers
    data = [range(10)] * cpu_count()
    workers = mp(data, work)

    # Make the workers work
    workers.compute()

    # Get the data from the workers
    workers_data = []
    for item in workers.items():
        workers_data.append(item)
    print workers_data

For this example, the output should be in this format:

[[0, 1, 4, 9, 16, 25, 36, 49, 64, 81] * n_procs]

If you try running this code you will get an exception stating that too many arguments were passed to dyn. I believe the issue is that dyn is overwritten for this instance, but when Process is called, the changes no longer exist.

How can I get around this issue?

Note - This code needs to run on Windows. I am using Python 2.7.

UPDATE

Based off the comments I was getting, I decided to do something "messy". Below is my working solution:

import sys, re, uuid, os
from cStringIO import StringIO
from multiprocessing import Process, Queue, cpu_count

class mp():
    def __init__(self, data, func, n_procs = cpu_count()):
        self.data    = data
        self.q       = Queue()
        self.n_procs = n_procs
        self.module  = 'm' + str(uuid.uuid1()).replace('-', '')
        self.file    = self.module + '.py'

        # Build external module
        self.__func_to_module(func)

    def __func_to_module(self, func):   
        with open(self.file, 'wb') as f:
            for line in StringIO(func):
                if 'def' in line:
                    f.write(re.sub(r'def .*\(', 'def work(', line))
                else:
                    f.write(line)

    def _worker(self, q, module, *items):
        exec('from %s import work' % module)
        data = []
        for item in items[0]:
            data.append(work(item))
        q.put(data)

    def compute(self):
        for item in self.data:
            Process(target=getattr(self, '_worker'),
                args=(self.q, self.module, item)).start()

    def items(self):
        queue_count = self.n_procs
        while queue_count > 0:
            queue_count -= 1
            yield self.q.get()
        os.remove(self.file)
        os.remove(self.file + 'c')

if __name__ == '__main__':  
    func = '''def func(x):
        return x ** 2'''

    # Create workers
    data = [range(10)] * cpu_count()
    workers = mp(data, func)

    # Make the workers work
    workers.compute()

    # Get the data from the workers
    workers_data = []
    for item in workers.items():
        workers_data.append(item)
    print workers_data

Upvotes: 2

Views: 2305

Answers (1)

mdurant
mdurant

Reputation: 28684

On windows, the modules are re-loaded when each new process is started, so the definition of dyn is lost. You can, however, pass a function through the queue, or through arguments to the target function of the process.

def _worker(*items, func=None, q=None):  
    #note that this had better be a function not a method
    data = []
    for item in items:
        data.append(func(item))
    q.put(data)

#...
Process(target=_worker, args=item, kwargs={'func':dyn, 'q':q})

Upvotes: 1

Related Questions