Felipe Moser
Felipe Moser

Reputation: 365

Simplest way to multithread using class variables?

I'm trying to implement a function in my class that calculates information from array A and outputs the result in array B. Array A and array B are both variables of a class, as is the function. Something along these lines:

class MyClass:
    def __init__(self, A):
        self.A = A
        self.B = np.zeros((A.shape[0], A.shape[1])

    def my_function(self, i):
        self.B += self.A[i]

    def main(self):
        for i in range(A.shape[2]):
            my_function(i)

example = np.random.rand(256, 256, 1000)
my_class = MyClass(example)
my_result = my_class.B

Obviously this function is oversimplified but the question revolves about how to use multiprocess with variables self.A and self.B. I've tried something like this but it didn't work at all:

class MyClass:
    def __init__(self, A):
        self.A = A
        self.B = np.zeros((A.shape[0], A.shape[1])

    def my_function(self, i):
        return self.A[i]

    def main(self):
        with multiprocessing.Pool() as p:
            position = range(self.A.shape[2])
            for i, result in enumerate(p.map(my_function, position)) 
                self.B += result

Upvotes: 0

Views: 30

Answers (1)

bivouac0
bivouac0

Reputation: 2570

You can get your example code to work doing something like...

class MyClass:
    def __init__(self, A):
        self.A = A
        self.B = np.zeros((A.shape[0], A.shape[1]))

    def my_function(self, i):
        return self.A[:,:,i]

    def run(self):
        with Pool() as p:
            position = range(self.A.shape[2])
            for result in p.imap(self.my_function, position, chunksize=self.A.shape[2]):     
                self.B += result

example = np.random.rand(256, 256, 1000)
my_class = MyClass(example)
st = time.time()
my_class.run()
print(time.time() - st)

The problem with multiprocessing is that it has to fork new processes and then serialize (via pickle) the data going into and out of them. For simple code like this, the overhead is much more than the actual function you're completing.

Setting chunksize to the size of your iterable is just a way to assure that python doesn't fork process pools more than once and thus reduce the overhead. For this example the multiprocessed code is still slower than doing it single process, however if you have a more complex function, the MP version could be faster.

As a general rule, I try to never put the multiprocessed function/data inside of the class. This leads to a lot of extra overhead in the fork/pickle/unpickle process. You can move your function outside with something like...

# Simple gobal data / function
data = None
def my_function(i):
    global data
    return data[:,:,i]

class MyClass:
    def __init__(self, A):
        global data
        data   = A   # move class data to global
        self.A = A
        self.B = np.zeros((A.shape[0], A.shape[1]))

    def run(self):
        with Pool() as p:
            position = range(self.A.shape[2])
            for result in p.imap(my_function, position, chunksize=self.A.shape[2]):
                self.B += result


example = np.random.rand(256, 256, 1000)
my_class = MyClass(example)
st = time.time()
my_class.run()
print(time.time() - st)

For a simple function like this multiprocessing will still be slower, but if your actual function has a lot of complexity this can speed things up.

Upvotes: 1

Related Questions