Reputation:
So I thought I'd finally post; what is the proper way to manage Process
workers? I've tried to use a Pool
, but I noticed I could not get the return value of each completed process. I tried to use a callback but that didn't work as expected either. Should I just be managing them myself with active_children ()
?
My Pool code:
from multiprocessing import *
import time
import random
SOME_LIST = []
def myfunc():
a = random.randint(0,3)
time.sleep(a)
return a
def cb(retval):
SOME_LIST.append(retval)
print("Starting...")
p = Pool(processes=8)
p.apply_async(myfunc, callback=cb)
p.close()
p.join()
print("Stopping...")
print(SOME_LIST)
I expect a list of values; but all I get is the last item in the worker job to complete:
$ python multi.py
Starting...
Stopping...
[3]
Note: The answer should not use threading
module; here is the reason why:
In CPython, due to the Global Interpreter Lock, only one thread can execute Python code at once (even though certain performance-oriented libraries might overcome this limitation). If you want your application to make better use of the computational resources of multi-core machines, you are advised to use multiprocessing.
Upvotes: 0
Views: 1615
Reputation: 94871
You're misunderstanding the way apply_async
works. It doesn't call the function you pass to it in every process in the Pool
. It just calls the function one time, in one of the worker processes. So the results you're seeing are to be expected. You have a couple of options to get the behavior you want:
from multiprocessing import Pool
import time
import random
SOME_LIST = []
def myfunc():
a = random.randint(0,3)
time.sleep(a)
return a
def cb(retval):
SOME_LIST.append(retval)
print("Starting...")
p = Pool(processes=8)
for _ in range(p._processes):
p.apply_async(myfunc, callback=cb)
p.close()
p.join()
print("Stopping...")
print(SOME_LIST)
Or
from multiprocessing import Pool
import time
import random
def myfunc():
a = random.randint(0,3)
time.sleep(a)
return a
print("Starting...")
p = Pool(processes=8)
SOME_LIST = p.map(myfunc, range(p._processes))
p.close()
p.join()
print("Stopping...")
print(SOME_LIST)
Note that you could also call apply_async
or map
for more than the number of processes in the pool. The idea of the Pool
is that it guarantees exactly num_processes
processes will be running for the entire lifetime of the Pool
, no matter how many tasks you submit. So if you create a Pool(8)
and call apply_async
once, one of your eight workers will get a task, and the other seven will be idle. If you create a Pool(8)
and call apply_async
80 times, the 80 tasks will get distributed to your eight workers, with no more than eight of the tasks actually being processed at once.
Upvotes: 6