Reputation: 23
I tried to run the following codes:
import multiprocessing
import time
def init_queue():
print("init g_queue start")
while not g_queue.empty():
g_queue.get()
for _index in range(10):
g_queue.put(_index)
print("init g_queue end")
return
def task_io(task_id):
print("IOTask[%s] start" % task_id)
print("the size of queue is %s" % g_queue.qsize())
while not g_queue.empty():
time.sleep(1)
try:
data = g_queue.get(block=True, timeout=1)
print("IOTask[%s] get data: %s" % (task_id, data))
except Exception as excep:
print("IOTask[%s] error: %s" % (task_id, str(excep)))
print("IOTask[%s] end" % task_id)
return
g_queue = multiprocessing.Queue()
if __name__ == '__main__':
print("the size of queue is %s" % g_queue.qsize())
init_queue()
print("the size of queue is %s" % g_queue.qsize())
time_0 = time.time()
process_list = [multiprocessing.Process(target=task_io, args=(i,)) for i in range(multiprocessing.cpu_count())]
for p in process_list:
p.start()
for p in process_list:
if p.is_alive():
p.join()
print("End:", time.time() - time_0, "\n")
what I got was the following:
the size of queue is 0
init g_queue start
init g_queue end
the size of queue is 10
IOTask[0] start
the size of queue is 0
IOTask[0] end
IOTask[1] start
the size of queue is 0
IOTask[1] end
('End:', 0.6480000019073486, '\n')
What I was expecting was
IOTask[0] start
the size of queue is 10
Because after initialization of g_queue, the size of queue was supposed to be 10, not 0. It seems like the queue is not in the shared memory. When the sub process starts, a copy of g_queue is created and its size is 0.
Why multiprocessing.queue is not in the shared memory? Please advise. Many thanks!
Upvotes: 2
Views: 3000
Reputation: 2255
You should pass your g_queue as a parameter, then it will work.
demo for using multiprocessing with queue
import multiprocessing
import time
def long_time_calculate(n, result_queue):
time.sleep(1)
result_queue.put(n)
if __name__ == '__main__':
result_queue = multiprocessing.Queue()
pool_size = multiprocessing.cpu_count() * 2
pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=4)
manager = multiprocessing.Manager()
result_queue = manager.Queue()
inputs = [(1, result_queue), (2, result_queue), (3, result_queue), (4, result_queue)]
for input in inputs:
pool.apply_async(long_time_calculate, input)
pool.close()
pool.join()
print(list(result_queue.get() for _ in inputs))
Upvotes: 3