Kramen
Kramen

Reputation: 43

Python multiprocessing only using 1 processor

EDIT:

Thank you for the answers. I understand the problem. I believe to have fixed it by changing the imap_unordered line but now I have a different problem.

with multiprocessing.Pool(processes=proc) as p:
    for i in p.imap_unordered(run_task, iterable, chunksize=chunksize):
        print(i)

I purposefully didn't include all the other prints to better understand why this isn't woring. It outputs:

File ".\test.py", line 23, in test_run
    for i in p.imap(run_task, iterable, chunksize=chunksize):
TypeError: 'int' object is not iterable

I tried the same thing with a different function, instead of run_task:

def f(x):
    return x*x

with multiprocessing.Pool(processes=proc) as p:
        for i in p.imap_unordered(f, iterable, chunksize=chunksize):
            print(i)

It outputs correctly! So one would think that the problem is in run_task function. So I changed the run_task function:

def run_task(iterable):
    return type(iterable)

As expected, the output was:

<class 'int'>
<class 'int'>
<class 'int'>
...

The problem is, isn't imap_unordered supposed to pass chunks of the iterable?

Why am I getting individual ints in the function instead of a sublist of chunks that it can work with?


Original

I have a very simple multiprocessing function that runs dummy workload. I'm running it multiple times with different chunksizes for imap_unordered and different processes number for the multiprocessing.Pool.

The problem is that the output is showing me that no matter how many processes or chunksizes I pass, imap_unordered is only passing the whole list to exactly 1 process.

The expected output would be that it will divide the list into chunks, giving each chunk to each process and I would be able to see that the process is receiving lists of different sizes.

Am I missing something here?

I have the following code and output:

import multiprocessing


def run_task(iterable):
    # Task to be executed, simulating dummy work
    work_simul = 0
    for number in iterable:
        work_simul += number * number
    return (len(iterable))


def test_run(proc, chunksize):
    # runs the function "run_task" with multiprocessing

    # defining our dummy iterable of length 10000
    iterable = [i**i for i in range(5000)]
    original_size = len(iterable)  # Size of the iterable for comparison
    results = {}
    with multiprocessing.Pool(processes=proc) as p:
        for process, r_value in \
                enumerate(p.imap_unordered(run_task, (iterable,),
                                           chunksize=chunksize)):
            # Add our process number and its return value into results so that we can compare performance here.
            results[process + 1] = r_value

    print(
        f"""Original size: {original_size}
Total process # {proc}\nChunksize # {chunksize}""")

    for key in results.keys():
        print(f"Process # {key}: has list length {results[key]}\n\n")


if __name__ == "__main__":
    test_run(1, 10)
    test_run(5, 10)
    test_run(10, 10)

    test_run(1, 100)
    test_run(5, 100)
    test_run(10, 100)

Output:

Original size: 5000
Total process # 1
Chunksize # 10
Process # 1: has list length 5000


Original size: 5000
Total process # 5
Chunksize # 10
Process # 1: has list length 5000


Original size: 5000
Total process # 10
Chunksize # 10
Process # 1: has list length 5000


Original size: 5000
Total process # 1
Chunksize # 100
Process # 1: has list length 5000


Original size: 5000
Total process # 5
Chunksize # 100
Process # 1: has list length 5000


Original size: 5000
Total process # 10
Chunksize # 100
Process # 1: has list length 5000

Upvotes: 0

Views: 697

Answers (1)

mrEvgenX
mrEvgenX

Reputation: 766

Referring to the multiprocessing documentation.

imap_unordered is equivalient to imap from itertools, and imap is similar to map.

Simply saying, they all apply a function to each element of iterable.

Calling

for process, r_value in \
        enumerate(p.imap_unordered(run_task, (iterable,),
                                   chunksize=chunksize)):
    # Add our process number and its return value into results so that we can compare performance here.
    results[process + 1] = r_value

you apply run_task to a list of single element. So it can do only one process.

Enumerating it - you get tuples, where the first element is not a process number it's a number of element in original list.

Upvotes: 2

Related Questions