Reputation: 25
I am trying to parallelize my code using the multiprocessing module. The code I am working on works in two steps. In the first step I initialize a class, which calculates and holds several variables, which are used in the second step. In the second step the program performs calculations using the previously initialized variables. The variables of the first step are not modified in any way. The calculation time of the first step is not important but in the second step because it is called a few hundred times in necessarily sequential order. Below is a constructed minimal example of the code structure and ist output.
import numpy as np
import time
from multiprocessing import Pool
class test:
def __init__(self):
self.r = np.ones(10000000)
def f(self,init):
summed = 0
for i in range(0,init):
summed = summed + i
return summed
if __name__ == "__main__":
# first step
func = test()
# second step
# sequential
start_time = time.time()
for i in [1000000,1000000,1000000,1000000]:
func.f(i)
print('Sequential: ', time.time()-start_time)
# parallel
start_time = time.time()
pool = Pool(processes=None)
result = pool.starmap(func.f,[[1000000],[1000000],[1000000],[1000000]])
print('Parallel: ', time.time()-start_time)
Output:
Sequential: 0.2673146724700928
Parallel: 1.5638213157653809
As I understand multiprocessing becomes slower because the variable r of the class test has to be transferred to all of the worker processes. To circumvent this I would need to initialize the class on each worker before starting f. Is this possible with multiprocessing? Are there other tools for doing this?
Upvotes: 0
Views: 253
Reputation: 25
I have solved the issue by using the Pipe function from the multiprocessing module. In the first step I can now initialize my variables and setup a multiprocessing enviroment. Then I use the Pipe function to transfer the input data.
For "self.r = np.ones(100000000)"
Parallel piped: 0.8008558750152588
Parallel 2: 18.51273012161255
For "self.r = np.ones(10000000)"
Parallel piped: 0.71409010887146
Parallel 2: 1.4551067352294922
import numpy as np
import time
import multiprocessing as mp
class Test: # PEP8: `CamelCaseNames` for classes
def __init__(self):
self.r = np.ones(100000000)
def f(self, init):
summed = 0
for i in range(init):
summed = summed + i
return summed
def my_function(value):
func = Test()
return func.f(value)
class Connection:
def __init__(self):
self.process = {}
self.parent = {}
self.child = {}
def add(self, hub, process, parent_conn, child_conn):
self.process[hub] = process
self.parent[hub] = parent_conn
self.child[hub] = child_conn
def multi_run(child_conn, func, i):
while 1:
init = child_conn.recv()
data = func.f(init)
child_conn.send(data)
if __name__ == "__main__":
N_processes = 4
func = Test()
conn = Connection()
# First step
for i in range(N_processes):
parent_conn, child_conn = mp.Pipe()
process = mp.Process(target=multi_run, args=(child_conn, func, i))
conn.add(i, process, parent_conn, child_conn)
process.start()
start_time = time.time()
data = [[1000000, x] for x in range(30)]
# Second step
for i, j in data:
conn.parent[j % N_processes].send(i)
for i, j in data:
conn.parent[j % N_processes].recv()
print('Parallel piped:', time.time()-start_time)
data = [[1000000] for x in range(30)]
# parallel 2
start_time = time.time()
pool = mp.Pool(processes=None)
result = pool.starmap(my_function, data)
print('Parallel 2:', time.time()-start_time)
Upvotes: 1
Reputation: 142631
Simply create function
def my_function(value):
func = Test()
return func.f(value)
or even
def my_function(value):
return Test().f(value)
and use it
result = pool.starmap(my_function, [[1000000],[1000000],[1000000],[1000000]])
Multiprocessing doesn't work with lambda
so you can't use
pool.starmap(lambda value:Test().f(value), ...)
Probably it doesn't work functools.partial()
so you can't use it instead of lambda
Minimal working example
import numpy as np
import time
from multiprocessing import Pool
class Test: # PEP8: `CamelCaseNames` for classes
def __init__(self):
self.r = np.ones(10000000)
def f(self, init):
summed = 0
for i in range(init):
summed = summed + i
return summed
def my_function(value):
func = Test()
return func.f(value)
if __name__ == "__main__":
data = [[1000000] for x in range(30)]
# first step
func = Test()
# second step
# sequential
start_time = time.time()
for i in data:
func.f(*i) # `*i` like in starmap
print('Sequential:', time.time()-start_time)
# parallel 1
start_time = time.time()
pool = Pool(processes=None)
result = pool.starmap(func.f, data)
print('Parallel 1:', time.time()-start_time)
# parallel 2
start_time = time.time()
pool = Pool(processes=None)
result = pool.starmap(my_function, data)
print('Parallel 2:', time.time()-start_time)
My results:
Sequential: 3.0593459606170654
Parallel 1: 5.2161490917205810
Parallel 2: 1.8350131511688232
Upvotes: 0