Reputation: 2893
I want to use a queue to hold result because I want a consumer (serial not parallel) to process the result of the workers as the workers produce the result.
For now, I want to know why the following program hangs.
import multiprocessing as mp
import time
import numpy as np
def worker(arg):
time.sleep(0.2)
q, arr = arg
q.put(arr[0])
p = mp.Pool(4)
x = np.array([4,4])
q = mp.Queue()
for i in range(4):
x[0] = i
#worker((q,x))
p.apply_async(worker, args=((q, x),))
print("done_apply")
time.sleep(0.2)
for i in range(4):
print(q.get())
Upvotes: 1
Views: 685
Reputation: 104792
I think your choice to use multiprocessing.Pool
alongside your own queue
is the source of the main problems you're having. Using a pool creates the child processes up front, which jobs are later assigned to. But since you can't (easily) pass a queue
to an already existing process, that's not a good match for your problem.
Instead, you should either get rid of your own queue and use the queue that's built into the pool to get a value return
ed by worker
or scrap the pool completely and use multiprocessing.Process
to start a new process for each task you have to do.
I'd also note that your code has a race condition in the main processes between the main thread that modifies the x
array and the thread that serializes the old value before it's sent to a worker process. Much of the time you'll probably end up sending many copies of the same array (with the final value) instead of the several different values you intend.
Here's a quick and untested version that drops the queue:
def worker(arr):
time.sleep(0.2)
return arr[0]
if __name__ == "__main__":
p = mp.Pool(4)
results = p.map(worker, [np.array([i, 4]) for i in range(4)])
p.join()
for result in results:
print(result)
And here's a version that drops the Pool
and keeps the queue:
def worker(q, arr):
time.sleep(0.2)
q.put(arr[0])
if __name__ == "__main__":
q = m.Queue()
processes = []
for i in range(4):
p = mp.Process(target=worker, args=(q, np.array([i, 4])))
p.start()
processes.append(p)
for i in range(4):
print(q.get())
for p in processes:
p.join()
Note that in the last version it may be important that we get
the results from the queue before we try to join
the processes (though probably not if we're only dealing with four values). If the queue were to fill up, a deadlock could occur if we did it in the other order. The worker might be blocked trying to write to the queue, while the main process is blocked waiting for the worker process to exit.
Upvotes: 1
Reputation: 140276
Queue
objects cannot be shared. I came to the same conclusion as the OP first by finding this answer.
Unfortunately, there were other problems in this code (which doesn't make it an exact duplicate of the linked answer)
worker(arg)
should be worker(*arg)
for the args unpacking to work. Without that, my process locked up too (I admit I don't know why. It should have thrown an exception, but I guess that multiprocessing & exceptions don't work well together)x
to the workers result in same number as a result (with apply
it works, but not with apply_async
Another thing: for the code to be portable, wrap the main code by if __name__ == "__main__":
, required on Windows because of differences in process spawning
Fully fixed code that outputs 0,3,2,1 for me:
import multiprocessing as mp
import time
import numpy as np
def worker(*arg): # there are 2 arguments to "worker"
#def worker(q, arr): # is probably even better
time.sleep(0.2)
q, arr = arg
q.put(arr[0])
if __name__ == "__main__":
p = mp.Pool(4)
m = mp.Manager() # use a manager, Queue objects cannot be shared
q = m.Queue()
for i in range(4):
x = np.array([4,4]) # create array each time (or make a copy)
x[0] = i
p.apply_async(worker, args=(q, x))
print("done_apply")
time.sleep(0.2)
for i in range(4):
print(q.get())
Upvotes: 1
Reputation: 2893
Change apply_async to apply gives error message:
"Queue objects should only be shared between processes through inheritance"
A solution:
import multiprocessing as mp
import time
import numpy as np
def worker(arg):
time.sleep(0.2)
q, arr = arg
q.put(arr[0])
p = mp.Pool(4)
x = np.array([4,4])
m = mp.Manager()
q = m.Queue()
for i in range(4):
x[0] = i
#worker((q,x))
p.apply_async(worker, args=((q, x),))
print("done_apply")
time.sleep(0.2)
for i in range(4):
print(q.get())
Result:
done_apply
3
3
3
3
Apparently, I need to manually make copies of the numpy array because the desired result should be 0, 1, 2, 3 in any order instead of 3, 3, 3, 3.
Upvotes: 1