Sergej
Sergej

Reputation: 25

Python Multiprocessing, preinitialization of variables

I am trying to parallelize my code using the multiprocessing module. The code I am working on works in two steps. In the first step I initialize a class, which calculates and holds several variables, which are used in the second step. In the second step the program performs calculations using the previously initialized variables. The variables of the first step are not modified in any way. The calculation time of the first step is not important but in the second step because it is called a few hundred times in necessarily sequential order. Below is a constructed minimal example of the code structure and ist output.

import numpy as np
import time
from multiprocessing import Pool

class test:
    def __init__(self):
        self.r = np.ones(10000000)


    def f(self,init):
        summed = 0
        for i in range(0,init):
            summed = summed + i
        return summed


if __name__ == "__main__":
    # first step 
    func = test()
    
    
    # second step
    # sequential
    start_time = time.time()
    for i in [1000000,1000000,1000000,1000000]:
        func.f(i)
    print('Sequential: ', time.time()-start_time)

    
    # parallel
    start_time = time.time()
    pool = Pool(processes=None)
    result = pool.starmap(func.f,[[1000000],[1000000],[1000000],[1000000]])
    print('Parallel: ', time.time()-start_time)

Output:
Sequential: 0.2673146724700928
Parallel: 1.5638213157653809

As I understand multiprocessing becomes slower because the variable r of the class test has to be transferred to all of the worker processes. To circumvent this I would need to initialize the class on each worker before starting f. Is this possible with multiprocessing? Are there other tools for doing this?

Upvotes: 0

Views: 253

Answers (2)

Sergej
Sergej

Reputation: 25

I have solved the issue by using the Pipe function from the multiprocessing module. In the first step I can now initialize my variables and setup a multiprocessing enviroment. Then I use the Pipe function to transfer the input data.

For "self.r = np.ones(100000000)"
Parallel piped: 0.8008558750152588
Parallel 2: 18.51273012161255

For "self.r = np.ones(10000000)"
Parallel piped: 0.71409010887146 Parallel 2: 1.4551067352294922

import numpy as np
import time
import multiprocessing as mp


class Test:  # PEP8: `CamelCaseNames` for classes
    def __init__(self):
        self.r = np.ones(100000000)

    def f(self, init):
        summed = 0
        for i in range(init):
            summed = summed + i
        return summed


def my_function(value):
    func = Test()
    return func.f(value)


class Connection:
    def __init__(self):
        self.process = {}
        self.parent = {}
        self.child = {}

    def add(self, hub, process, parent_conn, child_conn):
        self.process[hub] = process
        self.parent[hub] = parent_conn
        self.child[hub] = child_conn


def multi_run(child_conn, func, i):
    while 1:
        init = child_conn.recv()
        data = func.f(init)
        child_conn.send(data)


if __name__ == "__main__":
    N_processes = 4

    func = Test()
    conn = Connection()
    # First step
    for i in range(N_processes):
        parent_conn, child_conn = mp.Pipe()
        process = mp.Process(target=multi_run, args=(child_conn, func, i))
        conn.add(i, process, parent_conn, child_conn)
        process.start()

    start_time = time.time()
    data = [[1000000, x] for x in range(30)]
    # Second step
    for i, j in data:
        conn.parent[j % N_processes].send(i)
    for i, j in data:
        conn.parent[j % N_processes].recv()
    print('Parallel piped:', time.time()-start_time)

    data = [[1000000] for x in range(30)]
    # parallel 2
    start_time = time.time()
    pool = mp.Pool(processes=None)
    result = pool.starmap(my_function, data)
    print('Parallel 2:', time.time()-start_time)

Upvotes: 1

furas
furas

Reputation: 142631

Simply create function

def my_function(value):
    func = Test()
    return func.f(value)

or even

def my_function(value):
    return Test().f(value)

and use it

result = pool.starmap(my_function, [[1000000],[1000000],[1000000],[1000000]])

Multiprocessing doesn't work with lambda so you can't use

pool.starmap(lambda value:Test().f(value), ...)

Probably it doesn't work functools.partial() so you can't use it instead of lambda


Minimal working example

import numpy as np
import time
from multiprocessing import Pool

class Test:  # PEP8: `CamelCaseNames` for classes
    
    def __init__(self):
        self.r = np.ones(10000000)

    def f(self, init):
        summed = 0
        for i in range(init):
            summed = summed + i
        return summed

def my_function(value):
    func = Test()
    return func.f(value)

if __name__ == "__main__":

    data = [[1000000] for x in range(30)]

    # first step 
    func = Test()
    
    # second step
    # sequential
    start_time = time.time()
    for i in data:
        func.f(*i)   # `*i` like in starmap
    print('Sequential:', time.time()-start_time)
    
    # parallel 1
    start_time = time.time()
    pool = Pool(processes=None)
    result = pool.starmap(func.f, data)
    print('Parallel 1:', time.time()-start_time)
    
    # parallel 2
    start_time = time.time()
    pool = Pool(processes=None)
    result = pool.starmap(my_function, data)
    print('Parallel 2:', time.time()-start_time)
    

My results:

Sequential: 3.0593459606170654
Parallel 1: 5.2161490917205810
Parallel 2: 1.8350131511688232

Upvotes: 0

Related Questions