Reputation: 61
I'm trying to use Python's multiprocessing map function. I have placed the map call inside a sub-function, as I need to loop through a larger data-set to divide it up and call map on the smaller chunks.
My problem is that the time.sleep(5) line is being called multiple times and 'Test!' is printing 5 times (which seems to equal once at the start, and then 2*2 for the number of loops * number of processes), even though it is at a higher level than the multiprocessing calls. At the same time, though, the CSV output is what I expect, so runParallel() is running as expected and being called the expected number of times.
from multiprocessing import Pool
import numpy as np
import os,csv,copy,time
from AuxFuncs import *
def master():
time.sleep(5)
print('Test!')
for mult in [1,10]:
runParallel(mult)
def runParallel(mult):
randIntInputs = list()
for i in range(5): randIntInputs.append((np.random.randint(10)*mult,mult))
if __name__=='__main__':
p = Pool(processes=2)
results = p.map(testFunc,randIntInputs)
p.close()
p.join()
valsToSave = [list(result[0]) for result in results]
write2dListToCSV(valsToSave,'output' + str(mult) + '.csv')
def testFunc(inputs):
return np.random.randint(1,10,5) * inputs[0],inputs[1]
master()
And the output is:
Test!
Test!
Test!
Test!
Test!
I thought the problem might be that I put the Pool call in a function, but even if I move it out of a function, I have the same issue ("Test!" is printed 3 times by the below code.)
from multiprocessing import Pool
import numpy as np
import os,csv,copy,time
from AuxFuncs import *
def testFunc(inputs):
return np.random.randint(1,10,5) * inputs[0],inputs[1]
print('Test!')
mult,randIntInputs = 5,list()
for i in range(5): randIntInputs.append((np.random.randint(10)*mult,mult))
if __name__=='__main__':
p = Pool(processes=2)
results = p.map(testFunc,randIntInputs)
p.close()
p.join()
valsToSave = [list(result[0]) for result in results]
write2dListToCSV(valsToSave,'output' + str(mult) + '.csv')
EDIT: Thanks for the help. Looks like this works:
from multiprocessing import Pool import numpy as np import os,csv,copy,time from AuxFuncs import *
def master():
if __name__=='__main__':
time.sleep(5)
print('Test!')
for mult in [1,10]:
runParallel(mult)
def runParallel(mult):
randIntInputs = list()
for i in range(5): randIntInputs.append((np.random.randint(10)*mult,mult))
# if __name__=='__main__':
p = Pool(processes=2)
results = p.map(testFunc,randIntInputs)
p.close()
p.join()
valsToSave = [list(result[0]) for result in results]
write2dListToCSV(valsToSave,'output' + str(mult) + '.csv')
def testFunc(inputs):
return np.random.randint(1,10,5) * inputs[0],inputs[1]
master()
Upvotes: 4
Views: 1734
Reputation: 1922
What's probably happening here is that each process is trying to import the function that your calling. When that happens, it runs any function that is called outside of a definition or is not shielded by an if
, including your call to master
. Putting the if __name__ ...
inside a definition disallows you from using it to shield other operations. What I think you're going for looks more like this:
def master():
time.sleep(5)
print('Test!')
for mult in range(1, 11):
runParallel(mult)
def runParallel(mult):
randIntInputs = list()
for i in range(5): randIntInputs.append((np.random.randint(10)*mult,mult))
with Pool(processes=2) as p:
results = p.map(testFunc,randIntInputs)
valsToSave = [list(result[0]) for result in results]
write2dListToCSV(valsToSave,'output' + str(mult) + '.csv')
def testFunc(inputs):
return np.random.randint(1,10,5) * inputs[0],inputs[1]
if __name__ == '__main__':
master()
The difference between this and your very last update is that, in your update master is still called for each process, it just doesn't do anything since the if
statement doesn't evaluate True
; but in this code, it only calls master
once, and is blocked each time after that by the if
statement. The difference isn't huge, but this version of it is much more practical.
Btw, I took the liberty of putting your pool in a context manager, using the with
statement. This will automatically close the Pool
once the context exits. I also removed the .join()
because the Pool().map()
function already pauses the main thread until it returns. Lastly, I changed the temporary list you were creating in master to a call to range
. range
is used to create a sequence of numbers between the two entered, inclusive to the left but not to the right. With a single parameter it uses 0 as a starting point and goes up to the number specified; range(10) => 0 1 2 3 4 5 6 7 8 9
.
Upvotes: 4