hamster on wheels
hamster on wheels

Reputation: 2893

Pool, queue, hang

I want to use a queue to hold result because I want a consumer (serial not parallel) to process the result of the workers as the workers produce the result.

For now, I want to know why the following program hangs.

import multiprocessing as mp
import time
import numpy as np
def worker(arg):
    time.sleep(0.2)
    q, arr = arg 
    q.put(arr[0])

p = mp.Pool(4)
x = np.array([4,4])
q = mp.Queue()

for i in range(4):
    x[0] = i 
    #worker((q,x))
    p.apply_async(worker, args=((q, x),)) 

print("done_apply")
time.sleep(0.2)
for i in range(4):
    print(q.get())

Upvotes: 1

Views: 685

Answers (3)

Blckknght
Blckknght

Reputation: 104792

I think your choice to use multiprocessing.Pool alongside your own queue is the source of the main problems you're having. Using a pool creates the child processes up front, which jobs are later assigned to. But since you can't (easily) pass a queue to an already existing process, that's not a good match for your problem.

Instead, you should either get rid of your own queue and use the queue that's built into the pool to get a value returned by worker or scrap the pool completely and use multiprocessing.Process to start a new process for each task you have to do.

I'd also note that your code has a race condition in the main processes between the main thread that modifies the x array and the thread that serializes the old value before it's sent to a worker process. Much of the time you'll probably end up sending many copies of the same array (with the final value) instead of the several different values you intend.

Here's a quick and untested version that drops the queue:

def worker(arr):
    time.sleep(0.2)
    return arr[0]

if __name__ == "__main__":
    p = mp.Pool(4)
    results = p.map(worker, [np.array([i, 4]) for i in range(4)])
    p.join()
    for result in results:
        print(result)

And here's a version that drops the Pool and keeps the queue:

def worker(q, arr): 
    time.sleep(0.2)
    q.put(arr[0])

if __name__ == "__main__":
    q = m.Queue()
    processes = []

    for i in range(4):
        p = mp.Process(target=worker, args=(q, np.array([i, 4])))
        p.start()
        processes.append(p)

    for i in range(4):
        print(q.get())

    for p in processes:
        p.join()

Note that in the last version it may be important that we get the results from the queue before we try to join the processes (though probably not if we're only dealing with four values). If the queue were to fill up, a deadlock could occur if we did it in the other order. The worker might be blocked trying to write to the queue, while the main process is blocked waiting for the worker process to exit.

Upvotes: 1

Jean-François Fabre
Jean-François Fabre

Reputation: 140276

Queue objects cannot be shared. I came to the same conclusion as the OP first by finding this answer.

Unfortunately, there were other problems in this code (which doesn't make it an exact duplicate of the linked answer)

  • worker(arg) should be worker(*arg) for the args unpacking to work. Without that, my process locked up too (I admit I don't know why. It should have thrown an exception, but I guess that multiprocessing & exceptions don't work well together)
  • passing the same x to the workers result in same number as a result (with apply it works, but not with apply_async

Another thing: for the code to be portable, wrap the main code by if __name__ == "__main__":, required on Windows because of differences in process spawning

Fully fixed code that outputs 0,3,2,1 for me:

import multiprocessing as mp
import time
import numpy as np
def worker(*arg):  # there are 2 arguments to "worker"
#def worker(q, arr):  # is probably even better
    time.sleep(0.2)
    q, arr = arg
    q.put(arr[0])

if __name__ == "__main__":
    p = mp.Pool(4)

    m = mp.Manager()  # use a manager, Queue objects cannot be shared
    q = m.Queue()

    for i in range(4):
        x = np.array([4,4])  # create array each time (or make a copy)
        x[0] = i
        p.apply_async(worker, args=(q, x))

    print("done_apply")
    time.sleep(0.2)
    for i in range(4):
        print(q.get())

Upvotes: 1

hamster on wheels
hamster on wheels

Reputation: 2893

Change apply_async to apply gives error message:

"Queue objects should only be shared between processes through inheritance"

A solution:

import multiprocessing as mp
import time
import numpy as np
def worker(arg):
    time.sleep(0.2)
    q, arr = arg
    q.put(arr[0])

p = mp.Pool(4)
x = np.array([4,4])
m = mp.Manager()
q = m.Queue()

for i in range(4):
    x[0] = i
    #worker((q,x))
    p.apply_async(worker, args=((q, x),))

print("done_apply")
time.sleep(0.2)
for i in range(4):
    print(q.get())

Result:

done_apply
3
3
3
3

Apparently, I need to manually make copies of the numpy array because the desired result should be 0, 1, 2, 3 in any order instead of 3, 3, 3, 3.

Upvotes: 1

Related Questions