mattm
mattm

Reputation: 5949

Why does passing a Queue cause this function to fail when called with Pool.apply_async?

When I pass a Queue object to a function called using Pool.apply_async, the function fails, as indicated by ApplyResult.successful() and the absence of print output. The body of the function does not appear to run at all in this case.

I had planned to use the Queue to synchronize collection of results from separate processes as suggested by the multiprocessing documentation, but the Queue causes failure even if it is not actually used in the function.

from multiprocessing import Pool, Queue
import time
from random import randint

def sample_function(name, results):
    delay_ms = randint(1, 10)
    print ("{} starting with delay {:d}".format(name, int(delay_ms)))
    time.sleep(delay_ms)
    # results argument is unused! 
    #results.put("{} result".format(name))
    print ("{} ending".format(name))

resultsQueue = Queue()
jobs = ['one','two','three','four', 'five', 'six']

pool = Pool(processes=4)
# fails
jobStatuses = [pool.apply_async(sample_function, args=(job, resultsQueue)) for job in jobs]
# succeeds
#jobStatuses = [pool.apply_async(sample_function, args=(job,'works with string argument')) for job in jobs]

pool.close()
print('closing: no more tasks')
pool.join()

for status in jobStatuses:
    print (status.ready(), status.successful())

while not resultsQueue.empty():
    print(resultsQueue.get())
print('All finished')

I can call the same function without Pool.apply_async and it will succeed: sample_function('test without pool', resultsQueue). I can also call the Pool.apply_async function with a string, and it will succeed.

Upvotes: 0

Views: 89

Answers (1)

Evya
Evya

Reputation: 2375

There's a RuntimeError occuring in each apply_async call with a multiprocessing.Queue which gets silenced.
Changing your code a little bit I was able to trace it:

for status in job_statuses:
    print(status.__dict__)

outputs:

{'_value': RuntimeError('Queue objects should only be shared between processes through inheritance',), '_success': False, '_callback': None, '_cache': {}, '_job': 0, '_error_callback': None, '_event': }

x6 times.

Using a Manager().Queue() which can be shared among processes solves this.

Upvotes: 3

Related Questions