Reputation: 884
many of the tutorials on multiprocessing use don't seem to completely address why the technique below works for threading but not multiprocessing.
Why doesn't this work for multiprocessing, and what is the implementation for what I am trying to do? Thank you!
Threading implementation (works fine, makes sense to me):
from threading import Thread
from Queue import Queue
from time import sleep
"""threading functions"""
def producer_thread(n):
for x in range(10):
thread_q.put(n)
def consumer_thread():
while True:
item = thread_q.get()
print item
if __name__ == '__main__':
thread_q = Queue()
"""works fine"""
p_thread = Thread(target=producer_thread, args=(10,))
c_thread = Thread(target=consumer_thread)
c_thread.daemon=True
p_thread.start(); c_thread.start()
p_thread.join()
"""prevents c_thread daemon process from cancelling prematurely"""
sleep(.001)
Output:
10
10
10
10
10
10
10
10
10
10
Multiprocessing implementation (seems to be identical to threading but doesn't work at all):
from multiprocessing import Process, freeze_support
from Queue import Queue
"""multiprocessing functions"""
def producer_process(n):
for x in range(10):
process_q.put(n)
def consumer_process():
while True:
item = process_q.get()
print item
#
if __name__ == '__main__':
freeze_support()
process_q = Queue()
"""computer explodes"""
p_process = Process(target=producer_process, args=(10,))
c_process = Process(target=consumer_process)
c_process.daemon=True
p_process.start(); c_process.start()
p_process.join()
Output:
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "C:\Users\J\Anaconda\lib\multiprocessing\forking.py", line 381, in main
self = load(from_parent)
File "C:\Users\J\Anaconda\lib\pickle.py", line 1378, in load
return Unpickler(file).load()
File "C:\Users\J\Anaconda\lib\pickle.py", line 858, in load
dispatch[key](self)
File "C:\Users\J\Anaconda\lib\pickle.py", line 1090, in load_global
klass = self.find_class(module, name)
File "C:\Users\J\Anaconda\lib\pickle.py", line 1126, in find_class
klass = getattr(mod, name)
AttributeError: 'module' object has no attribute 'get_successors'
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "C:\Users\J\Anaconda\lib\multiprocessing\forking.py", line 381, in main
self = load(from_parent)
File "C:\Users\J\Anaconda\lib\pickle.py", line 1378, in load
return Unpickler(file).load()
File "C:\Users\J\Anaconda\lib\pickle.py", line 858, in load
dispatch[key](self)
File "C:\Users\J\Anaconda\lib\pickle.py", line 1090, in load_global
klass = self.find_class(module, name)
File "C:\Users\J\Anaconda\lib\pickle.py", line 1126, in find_class
klass = getattr(mod, name)
AttributeError: 'module' object has no attribute 'get_successors'
Process Process-33:
Traceback (most recent call last):
File "C:\Users\J\Anaconda\lib\multiprocessing\process.py", line 258, in _bootstrap
self.run()
File "C:\Users\J\Anaconda\lib\multiprocessing\process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "C:\Users\J\Documents\Python Scripts\producer_consumer_test.py", line 18, in consumer
item = q.get()
NameError: global name 'q' is not defined
Process Process-32:
Traceback (most recent call last):
File "C:\Users\J\Anaconda\lib\multiprocessing\process.py", line 258, in _bootstrap
self.run()
File "C:\Users\J\Anaconda\lib\multiprocessing\process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "C:\Users\J\Documents\Python Scripts\producer_consumer_test.py", line 14, in producer
q.put(n)
NameError: global name 'q' is not defined
Process Process-34:
Traceback (most recent call last):
File "C:\Users\J\Anaconda\lib\multiprocessing\process.py", line 258, in _bootstrap
self.run()
File "C:\Users\J\Anaconda\lib\multiprocessing\process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "C:\Users\J\Documents\Python Scripts\producer_consumer_test.py", line 14, in producer
q.put(n)
NameError: global name 'q' is not defined
Process Process-35:
Traceback (most recent call last):
File "C:\Users\J\Anaconda\lib\multiprocessing\process.py", line 258, in _bootstrap
self.run()
File "C:\Users\J\Anaconda\lib\multiprocessing\process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "C:\Users\J\Documents\Python Scripts\producer_consumer_test.py", line 18, in consumer
item = q.get()
NameError: global name 'q' is not defined
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "C:\Users\J\Anaconda\lib\multiprocessing\forking.py", line 381, in main
self = load(from_parent)
File "C:\Users\J\Anaconda\lib\pickle.py", line 1378, in load
return Unpickler(file).load()
File "C:\Users\J\Anaconda\lib\pickle.py", line 858, in load
dispatch[key](self)
File "C:\Users\J\Anaconda\lib\pickle.py", line 1090, in load_global
klass = self.find_class(module, name)
File "C:\Users\J\Anaconda\lib\pickle.py", line 1126, in find_class
klass = getattr(mod, name)
AttributeError: 'module' object has no attribute 'consumer'
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "C:\Users\J\Anaconda\lib\multiprocessing\forking.py", line 381, in main
self = load(from_parent)
File "C:\Users\J\Anaconda\lib\pickle.py", line 1378, in load
return Unpickler(file).load()
File "C:\Users\J\Anaconda\lib\pickle.py", line 858, in load
dispatch[key](self)
File "C:\Users\J\Anaconda\lib\pickle.py", line 1090, in load_global
klass = self.find_class(module, name)
File "C:\Users\J\Anaconda\lib\pickle.py", line 1126, in find_class
klass = getattr(mod, name)
AttributeError: 'module' object has no attribute 'producer'
Upvotes: 1
Views: 1103
Reputation: 1638
import Queue is for multithreads apps: https://docs.python.org/2/library/queue.html not for multi processes apps.
from multiprocessing import Queue is for multiprocesses apps: https://docs.python.org/2/library/multiprocessing.html#exchanging-objects-between-processes
According to the documentation multiprocessing.Queue "is a near clone of Queue.Queue"
Besides multiprocessing.Queue there is the JoinableQueue that has task_done() and join() methods in case you need it.
In your example I don't think you need JoinableQueue. Did you try this:
from multiprocessing import (Process, Queue, freeze_support)
def producer(q, n):
for x in range(n):
q.put(x)
q.put("end")
def consumer(q):
while True:
item = q.get()
if item == "end":
break
print item
if __name__ == '__main__':
freeze_support()
q = Queue()
c = Process(target=consumer, args=(q,))
c.start()
p = Process(target=producer, args=(q, 10))
p.start()
c.join()
Tested in Linux and Windows.
Upvotes: 2
Reputation: 884
Okay, (sorry(?)) to answer my own question, I found a working implementation of what I was trying to do. There seems to be quite a bit of nuance going on.
First of all, multiprocessing requires a JoinableQueue rather than a standard Queue.
Secondly, since the multiprocessing functions are modifying a queue in place, the queue needs to be passed as an argument to the function -- maybe this should have been obvious, but I obviously overlooked it.
Thirdly, and perhaps most importantly, the threads don't print to the stdout of the interpreter -- they print to windows stdout, so you MUST run it from the command line if you want to see the output.
"""multiprocessing functions"""
def producer_process(n, q):
for x in range(10):
q.put(n)
def consumer_process(q):
while True:
item = q.get()
print item
q.task_done()
if __name__ == '__main__':
from multiprocessing import Process, freeze_support, JoinableQueue
freeze_support()
process_q = JoinableQueue()
'''launch consumer process'''
c_process = Process(target=consumer_process, args=(process_q,))
c_process.daemon = True
c_process.start()
'''launch producer process'''
p_process = Process(target=producer_process, args=(10, process_q))
p_process.start()
p_process.join()
process_q.join()
print "Done"
Upvotes: 0
Reputation: 277
When I run the Thread version, I got:
File "test.py", line 18, in <module>
p_thread = Thread(target=producer, args=(10,))
NameError: name 'producer' is not defined
Also, I think one error in multiprocessing version
NameError: global name 'q' is not defined
should be some typo. It seems that nothing named "q" is defined.
EDIT: Now I run thread version, and find less then ten "10" are printed: typically there are there or four - and it changes randomly in different run. I'm using python 2.7.5 . Can you check this issue ?
EDIT I run the mp version, there is no output or error message, and the program terminated quickly. I believe there are some issue with the logic - and it cannot be ignored. I think fixing the thread version first may be of great help for you.
Upvotes: 0