Reputation: 8234
I am experimenting the use of concurrent.futures.ProcessPoolExecutor
to parallelize a serial task. The serial task involves finding the number of occurrence of a given number from a number range. My code is shown below.
During its execution, I noticed from Task Manager / System Monitor / top that only one cpu/thread is constantly in operation despite giving the max_workers of processPoolExecutor
a value more than 1. Why is this the case? How can I parallelize my code using concurrent.futures?
My code was executed with python 3.5.
import concurrent.futures as cf
from time import time
def _findmatch(nmax, number):
print('def _findmatch(nmax, number):')
start = time()
match=[]
nlist = range(nmax)
for n in nlist:
if number in str(n):match.append(n)
end = time() - start
print("found {} in {}sec".format(len(match),end))
return match
def _concurrent(nmax, number, workers):
with cf.ProcessPoolExecutor(max_workers=workers) as executor:
start = time()
future = executor.submit(_findmatch, nmax, number)
futures = future.result()
found = len(futures)
end = time() - start
print('with statement of def _concurrent(nmax, number):')
print("found {} in {}sec".format(found, end))
return futures
if __name__ == '__main__':
match=[]
nmax = int(1E8)
number = str(5) # Find this number
workers = 3
start = time()
a = _concurrent(nmax, number, workers)
end = time() - start
print('main')
print("found {} in {}sec".format(len(a),end))
Upvotes: 3
Views: 4208
Reputation: 17263
The problem with your code is that it submits only one task which will then be executed by one of the workers while rest of them are doing nothing. You need to submit multiple tasks that can be executed by the workers in parallel.
Below example splits the search area to three different tasks of which each is executed by different worker. Futures returned by submit
are added to a list and once all of them are submitted wait
is used to wait them all to complete. If you call result
immediately after submitting a task it will block until the future is completes.
Note that instead of generating a list of numbers the code below just counts the numbers which have digit 5 in them in order to decrease the memory usage:
import concurrent.futures as cf
from time import time
def _findmatch(nmin, nmax, number):
print('def _findmatch', nmin, nmax, number)
start = time()
count = 0
for n in range(nmin, nmax):
if number in str(n):
count += 1
end = time() - start
print("found {} in {}sec".format(count,end))
return count
def _concurrent(nmax, number, workers):
with cf.ProcessPoolExecutor(max_workers=workers) as executor:
start = time()
chunk = nmax // workers
futures = []
for i in range(workers):
cstart = chunk * i
cstop = chunk * (i + 1) if i != workers - 1 else nmax
futures.append(executor.submit(_findmatch, cstart, cstop, number))
cf.wait(futures)
res = sum(f.result() for f in futures)
end = time() - start
print('with statement of def _concurrent(nmax, number):')
print("found {} in {}sec".format(res, end))
return res
if __name__ == '__main__':
match=[]
nmax = int(1E8)
number = str(5) # Find this number
workers = 3
start = time()
a = _concurrent(nmax, number, workers)
end = time() - start
print('main')
print("found {} in {}sec".format(a,end))
Output:
def _findmatch 0 33333333 5
def _findmatch 33333333 66666666 5
def _findmatch 66666666 100000000 5
found 17190813 in 20.09431290626526sec
found 17190813 in 20.443560361862183sec
found 22571653 in 20.47660517692566sec
with statement of def _concurrent(nmax, number):
found 56953279 in 20.6196870803833sec
main
found 56953279 in 20.648695707321167sec
Upvotes: 1
Reputation: 11573
Running your code shows that all three workers are there but two of them are sleeping. The problem is, that executor.submit(_findmatch, nmax, number)
only tells one worker to execute the function _findmatch
.
I don't understand what your code is doing but basically you need to either
executor.submit
map
so every _findmatch
gets only the chunk it is assigned to.Upvotes: 1