Reputation: 5949
When I pass a Queue object to a function called using Pool.apply_async, the function fails, as indicated by ApplyResult.successful()
and the absence of print output. The body of the function does not appear to run at all in this case.
I had planned to use the Queue to synchronize collection of results from separate processes as suggested by the multiprocessing documentation, but the Queue causes failure even if it is not actually used in the function.
from multiprocessing import Pool, Queue
import time
from random import randint
def sample_function(name, results):
delay_ms = randint(1, 10)
print ("{} starting with delay {:d}".format(name, int(delay_ms)))
time.sleep(delay_ms)
# results argument is unused!
#results.put("{} result".format(name))
print ("{} ending".format(name))
resultsQueue = Queue()
jobs = ['one','two','three','four', 'five', 'six']
pool = Pool(processes=4)
# fails
jobStatuses = [pool.apply_async(sample_function, args=(job, resultsQueue)) for job in jobs]
# succeeds
#jobStatuses = [pool.apply_async(sample_function, args=(job,'works with string argument')) for job in jobs]
pool.close()
print('closing: no more tasks')
pool.join()
for status in jobStatuses:
print (status.ready(), status.successful())
while not resultsQueue.empty():
print(resultsQueue.get())
print('All finished')
I can call the same function without Pool.apply_async
and it will succeed: sample_function('test without pool', resultsQueue)
. I can also call the Pool.apply_async
function with a string, and it will succeed.
Upvotes: 0
Views: 89
Reputation: 2375
There's a RuntimeError
occuring in each apply_async
call with a multiprocessing.Queue
which gets silenced.
Changing your code a little bit I was able to trace it:
for status in job_statuses:
print(status.__dict__)
outputs:
{'_value': RuntimeError('Queue objects should only be shared between processes through inheritance',), '_success': False, '_callback': None, '_cache': {}, '_job': 0, '_error_callback': None, '_event': }
x6 times.
Using a Manager().Queue()
which can be shared among processes solves this.
Upvotes: 3