Reputation: 1812
I have a script that includes opening a file from a list and then doing something to the text within that file. I'm using python multiprocessing and Pool to try to parallelize this operation. A abstraction of the script is below:
import os
from multiprocessing import Pool
results = []
def testFunc(files):
for file in files:
print "Working in Process #%d" % (os.getpid())
#This is just an illustration of some logic. This is not what I'm actually doing.
for line in file:
if 'dog' in line:
results.append(line)
if __name__=="__main__":
p = Pool(processes=2)
files = ['/path/to/file1.txt', '/path/to/file2.txt']
results = p.apply_async(testFunc, args = (files,))
results2 = results.get()
When I run this the print out of the process id is the same for each iteration. Basically what I'm trying to do is take each element of the input list and fork it out to a separate process, but it seems like one process is doing all of the work.
Upvotes: 19
Views: 30392
Reputation: 879361
apply_async
farms out one task to the pool. You would need to call
apply_async
many times to exercise more processors.results
. Since the pool workers are separate processes, the two
won't be writing to the same list. One way to work around this is to use an ouput Queue. You could set it up yourself, or use apply_async
's callback to setup the Queue for you. apply_async
will call the callback once the function completes. map_async
instead of apply_async
, but then you'd
get a list of lists, which you'd then have to flatten.So, perhaps try instead something like:
import os
import multiprocessing as mp
results = []
def testFunc(file):
result = []
print "Working in Process #%d" % (os.getpid())
# This is just an illustration of some logic. This is not what I'm
# actually doing.
with open(file, 'r') as f:
for line in f:
if 'dog' in line:
result.append(line)
return result
def collect_results(result):
results.extend(result)
if __name__ == "__main__":
p = mp.Pool(processes=2)
files = ['/path/to/file1.txt', '/path/to/file2.txt']
for f in files:
p.apply_async(testFunc, args=(f, ), callback=collect_results)
p.close()
p.join()
print(results)
Upvotes: 32
Reputation: 16308
Maybe in this case you should use map_async
:
import os
from multiprocessing import Pool
results = []
def testFunc(file):
message = ("Working in Process #%d" % (os.getpid()))
#This is just an illustration of some logic. This is not what I'm actually doing.
for line in file:
if 'dog' in line:
results.append(line)
return message
if __name__=="__main__":
print("saddsf")
p = Pool(processes=2)
files = ['/path/to/file1.txt', '/path/to/file2.txt']
results = p.map_async(testFunc, files)
print(results.get())
Upvotes: 8