Sun Bear
Sun Bear

Reputation: 8234

concurrent.futures issue: why only 1 worker?

I am experimenting the use of concurrent.futures.ProcessPoolExecutor to parallelize a serial task. The serial task involves finding the number of occurrence of a given number from a number range. My code is shown below.
During its execution, I noticed from Task Manager / System Monitor / top that only one cpu/thread is constantly in operation despite giving the max_workers of processPoolExecutor a value more than 1. Why is this the case? How can I parallelize my code using concurrent.futures? My code was executed with python 3.5.

import concurrent.futures as cf
from time import time

def _findmatch(nmax, number):    
    print('def _findmatch(nmax, number):')
    start = time()
    match=[]
    nlist = range(nmax)
    for n in nlist:
        if number in str(n):match.append(n)
    end = time() - start
    print("found {} in {}sec".format(len(match),end))
    return match

def _concurrent(nmax, number, workers):
    with cf.ProcessPoolExecutor(max_workers=workers) as executor:
        start = time()
        future = executor.submit(_findmatch, nmax, number)
        futures = future.result()
        found = len(futures)
        end = time() - start
        print('with statement of def _concurrent(nmax, number):')
        print("found {} in {}sec".format(found, end))
    return futures

if __name__ == '__main__':
    match=[]
    nmax = int(1E8)
    number = str(5) # Find this number
    workers = 3
    start = time()
    a = _concurrent(nmax, number, workers)
    end = time() - start
    print('main')
    print("found {} in {}sec".format(len(a),end))

Upvotes: 3

Views: 4208

Answers (2)

niemmi
niemmi

Reputation: 17263

The problem with your code is that it submits only one task which will then be executed by one of the workers while rest of them are doing nothing. You need to submit multiple tasks that can be executed by the workers in parallel.

Below example splits the search area to three different tasks of which each is executed by different worker. Futures returned by submit are added to a list and once all of them are submitted wait is used to wait them all to complete. If you call result immediately after submitting a task it will block until the future is completes.

Note that instead of generating a list of numbers the code below just counts the numbers which have digit 5 in them in order to decrease the memory usage:

import concurrent.futures as cf
from time import time

def _findmatch(nmin, nmax, number):
    print('def _findmatch', nmin, nmax, number)
    start = time()
    count = 0
    for n in range(nmin, nmax):
        if number in str(n):
            count += 1
    end = time() - start
    print("found {} in {}sec".format(count,end))
    return count

def _concurrent(nmax, number, workers):
    with cf.ProcessPoolExecutor(max_workers=workers) as executor:
        start = time()
        chunk = nmax // workers
        futures = []

        for i in range(workers):
            cstart = chunk * i
            cstop = chunk * (i + 1) if i != workers - 1 else nmax

            futures.append(executor.submit(_findmatch, cstart, cstop, number))

        cf.wait(futures)
        res = sum(f.result() for f in futures)
        end = time() - start
        print('with statement of def _concurrent(nmax, number):')
        print("found {} in {}sec".format(res, end))
    return res

if __name__ == '__main__':
    match=[]
    nmax = int(1E8)
    number = str(5) # Find this number
    workers = 3
    start = time()
    a = _concurrent(nmax, number, workers)
    end = time() - start
    print('main')
    print("found {} in {}sec".format(a,end))

Output:

def _findmatch 0 33333333 5
def _findmatch 33333333 66666666 5
def _findmatch 66666666 100000000 5
found 17190813 in 20.09431290626526sec
found 17190813 in 20.443560361862183sec
found 22571653 in 20.47660517692566sec
with statement of def _concurrent(nmax, number):
found 56953279 in 20.6196870803833sec
main
found 56953279 in 20.648695707321167sec

Upvotes: 1

hansaplast
hansaplast

Reputation: 11573

Running your code shows that all three workers are there but two of them are sleeping. The problem is, that executor.submit(_findmatch, nmax, number) only tells one worker to execute the function _findmatch.

I don't understand what your code is doing but basically you need to either

  • split up the task in three even parts and send each part to a process using executor.submit
  • split the task up in smaller chunks (let's say a chunk consisting all of 100 elements) and use map so every _findmatch gets only the chunk it is assigned to.

Upvotes: 1

Related Questions