Reputation: 43
Thank you for the answers. I understand the problem.
I believe to have fixed it by changing the imap_unordered
line but now I have a different problem.
with multiprocessing.Pool(processes=proc) as p:
for i in p.imap_unordered(run_task, iterable, chunksize=chunksize):
print(i)
I purposefully didn't include all the other prints to better understand why this isn't woring. It outputs:
File ".\test.py", line 23, in test_run
for i in p.imap(run_task, iterable, chunksize=chunksize):
TypeError: 'int' object is not iterable
I tried the same thing with a different function, instead of run_task
:
def f(x):
return x*x
with multiprocessing.Pool(processes=proc) as p:
for i in p.imap_unordered(f, iterable, chunksize=chunksize):
print(i)
It outputs correctly! So one would think that the problem is in run_task function. So I changed the run_task function:
def run_task(iterable):
return type(iterable)
As expected, the output was:
<class 'int'>
<class 'int'>
<class 'int'>
...
The problem is, isn't imap_unordered
supposed to pass chunks of the iterable?
Why am I getting individual int
s in the function instead of a sublist of chunks that it can work with?
I have a very simple multiprocessing function that runs dummy workload.
I'm running it multiple times with different chunksizes for imap_unordered
and different processes number for the multiprocessing.Pool
.
The problem is that the output is showing me that no matter how many processes
or chunksizes
I pass, imap_unordered
is only passing the whole list to exactly 1
process.
The expected output would be that it will divide the list into chunks, giving each chunk to each process and I would be able to see that the process is receiving lists of different sizes.
Am I missing something here?
I have the following code and output:
import multiprocessing
def run_task(iterable):
# Task to be executed, simulating dummy work
work_simul = 0
for number in iterable:
work_simul += number * number
return (len(iterable))
def test_run(proc, chunksize):
# runs the function "run_task" with multiprocessing
# defining our dummy iterable of length 10000
iterable = [i**i for i in range(5000)]
original_size = len(iterable) # Size of the iterable for comparison
results = {}
with multiprocessing.Pool(processes=proc) as p:
for process, r_value in \
enumerate(p.imap_unordered(run_task, (iterable,),
chunksize=chunksize)):
# Add our process number and its return value into results so that we can compare performance here.
results[process + 1] = r_value
print(
f"""Original size: {original_size}
Total process # {proc}\nChunksize # {chunksize}""")
for key in results.keys():
print(f"Process # {key}: has list length {results[key]}\n\n")
if __name__ == "__main__":
test_run(1, 10)
test_run(5, 10)
test_run(10, 10)
test_run(1, 100)
test_run(5, 100)
test_run(10, 100)
Output:
Original size: 5000
Total process # 1
Chunksize # 10
Process # 1: has list length 5000
Original size: 5000
Total process # 5
Chunksize # 10
Process # 1: has list length 5000
Original size: 5000
Total process # 10
Chunksize # 10
Process # 1: has list length 5000
Original size: 5000
Total process # 1
Chunksize # 100
Process # 1: has list length 5000
Original size: 5000
Total process # 5
Chunksize # 100
Process # 1: has list length 5000
Original size: 5000
Total process # 10
Chunksize # 100
Process # 1: has list length 5000
Upvotes: 0
Views: 697
Reputation: 766
Referring to the multiprocessing documentation.
imap_unordered
is equivalient to imap
from itertools
, and imap
is similar to map
.
Simply saying, they all apply a function to each element of iterable.
Calling
for process, r_value in \
enumerate(p.imap_unordered(run_task, (iterable,),
chunksize=chunksize)):
# Add our process number and its return value into results so that we can compare performance here.
results[process + 1] = r_value
you apply run_task
to a list of single element. So it can do only one process.
Enumerating it - you get tuples, where the first element is not a process number it's a number of element in original list.
Upvotes: 2